1、RabbitMQ基础简介
消息中间件
专业术语
消息队列概述
消息产生途径
应用场景
传输模式
队列模式
主题模式
RabbitMQ模式
三大组件
虚拟主机
消息确认
消息持久化
消息预取
Exchange(交换器)
Binding
Binding key
Routing Key
Exchange类型
广播模式fanout
组播模式Direct
topic
headers
消息流
专业术语
消息队列概述
消息产生途径
应用场景
传输模式
队列模式
主题模式
RabbitMQ模式
三大组件
虚拟主机
消息确认
消息持久化
消息预取
Exchange(交换器)
Binding
Binding key
Routing Key
Exchange类型
广播模式fanout
组播模式Direct
topic
headers
消息流
消息中间件
大部分消息中间件都基于AMQP协议研发,AMQP即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然,AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全;
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗,充分利用RabbitMQ提供的这些功能就可以处理我们绝大部分的异步业务了;
专业术语
Broker:简单来说就是消息队列服务器实体,或者说Borker就是RabbitMQ本身;
Exchange:消息交换机,它指定消息按什么规则路由到哪个队列;
Queue:消息队列载体,每个消息都会被投入到一个或多个队列,一个队列不受任何限制,它可以存储你想要存储的消息量,它本质上是一个无限的缓冲区,多个生产者可以向同一个队列发送消息,多个消费者可以尝试从同一个消息队列中接收数据,一个队列像下面这样(上面是它的队列名称);
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来;
Routing Key:路由关键字,exchange根据这个关键字进行消息投递;
Binding Key:BindingKey是Exchange和Queue绑定的规则描述,用来解析当Exchange接收到消息时,Exchange接收到的消息会带有RoutingKey这个字段,Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,如果满足要求,就往BindingKey所绑定的Queue发送消息,这样我们就解决了我们向RabbitMQ发送一次消息,可以分发到不同的Queue的过程;
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离;
producer:消息生产者,就是投递消息的程序;
consumer:消息消费者,就是接受消息的程序;
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务,换句话说一旦TCP连接建立起来,客户端紧接着就会创建一个AMQP信道(Channel),并且每个信道都会被指派一个唯一的ID;;
connection:无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接也就是 Connection;
消息队列概述
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。其主要用途:不同进程Process/线程Thread之间通信,消息队列技术是分布式应用间交换信息的一种技术,消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走,通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置,或在继续执行前不需要等待接收程序接收此消息;
MQ主要作用是接受和转发消息。你可以想想在生活中的一种场景:当你把信件的投进邮筒,邮递员肯定最终会将信件送给收件人。我们可以把MQ比作 邮局和邮递员,MQ和邮局的主要区别是,它不处理消息,但是,它会接受数据、存储消息数据、转发消息,实现异步处理,大并发操作;
消息产生途径
不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
应用场景
对一个消息中间件它存在的主要作用是为了分布式应用当中各组件间的解耦,像传统的C/S协议,C发出请求,S必须得在线,否则这个通信就没法完成,但是消息型,中间件的存在就打破了这种法则,每一个请求发出者,称之为消息生产者,在发出请求时,是直接把请求扔给中间件的,由中间件负责存储和转发,如果此时消费者不在线,依然并不妨碍此次请求的正常服务,所以它实现了各组件间的解耦;
在很多时候,我们都会遇到这样一个问题,程序运行在多机之上,并行这些运行在多机之上的服务在需要进行相互调用的时候,这是原始的技术方案是很头疼的问题,会遇到各种远程调用的问题,最直观最常见的就是网络延迟,并发同步等问题,那么消息服务就可以很轻松地解决这些问题,消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,可以把它用于系统间服务的相互调用(RPC);
传输模式
RabbitMQ和一般的消息传输模式,队列模式&主题模式区别;
队列模式
一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A,B那就是A,B总共会收到10条消息,不重复;
主题模式
对于Topic模式,一个发布者发布消息,有两个接收者A,B来订阅,那么发布了10条消息,A,B各收到10条消息;
RabbitMQ模式
生产者生产消息后不直接直接发到队列中,而是发到一个交换空间:Exchange,Exchange会根据Exchange类型和Routing Key来决定发到哪个队列中,这个讲到发布订阅在详细来看;
三大组件
消息型中间件最核心的组件有三个,第一交换器exchange,交换器的主要作用在于,把收到的消息交换至订阅者的队列,一个订阅者对应着一个队列,而每一个订阅者与队列之间建立通信的这么一个关联的信号,我们称之为一个binding,一般来讲一个订阅者对应一个队列,消息到底应该被放到哪一个队列,这取决于我们的路由模式;
路由模式通常经过绑定器进行定义,路有种类有多种,比如像Direct点到点的直接路由,Topic话题型路由,fan-out,Headers基于首部进行过滤,我们也可以称之为过滤器,不管是基于哪种路由模型,我们一旦定义了它的路由机制,其实就相当于将某个队列与这个交换器上的某一类消息产生了绑定,所以这个路由规则,我们通常称之为绑定器,因此对于一个对应的消息型中间件,关键组件有三个,exchange、binding、queue;
以Headers为例,每一个发出的消息,这个消息被扔到Borker上之后,它的组成部分主要有三个片段,第一个是Header,也就是我们的消息首部,就是这个消息发往消息中间件之后,消息中间件自己会给它加上首部,第二个是属性,就是用来描述这个对应的消息有多大发出时间等等,第三个是body,消息主体。三个部分共同组成,所以路由时,我们也可以基于Headers的某些首部进行路由;
所有的生产者,通常都是通过Borker的API将消息提交给Borker的exchange,由exchange根据binding(哪些队列binding了哪些消息),将其路由至指定的queue,而后由指定的订阅者从queue中取出消息;
虚拟主机
AMQP中还定义了第四个组件,virtualhost虚拟主机,虚拟主机主要是为了实现应用隔离的,每一个虚拟主机内部都可以有以上三大组件exchange、binding、queue,就相当于每一个组件就是一个逻辑的Borker一样,virtualhost将一个物理的Borker划分成多个不同应用,并且他们相互隔离,我们也可以给不同的vhost分配不同的user来管理;
消息确认
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;
如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开;
这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug,也就是Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑;
消息持久化
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。
消息预取
前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息;
Exchange(交换器)
在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃),Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在Exchange Types一节介绍。
Binding
RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了;
Binding key
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明,在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key,binding key并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue;
对于消息发布者而言它只负责把消息发布出去,甚至它也不知道消息是发到哪个queue,消息通过exchange到达queue,exchange的职责非常简单,就是一边接收发布者的消息一边把这些消息推到queue中。而exchange是怎么知道消息应该推到哪个queue呢,这就要通过绑定queue与exchange时的routingkey了,通过代码进行绑定并且指定routingkey,下面有一张关系图,p(发布者) —> x(exchange) bindding(绑定关系也就是我们的routingkey) 红色代表着queue;
Routing Key
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效,在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给
Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes;
Exchange类型
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种;
广播模式fanout
fanout在一种类似广播的模式,Exchange路由规则非常简单,这种模式不需要Routing Key,只需要把所有消息发送到Exchange,然后由Exchange将所有的消息路由到所有与它绑定的Queue中,如图生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费,这是一对多的方式,一条消息供所有消费者消费;
有一个重点,就是如果Queue接受到了Exchange发送来的消息,但是Queue下面并没有任何消费者,那么此时,所有消息将会被抛弃;
组播模式Direct
Direct类型的Exchange会把消息路由到那些binding key与routing key完全匹配的Queue中,如图,我们以routingKey=”error”发送消息到Exchange,则消息会路由到C2和C3;如果我们以routingKey=”info”来发送消息,那么消息只会路由到C1。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中;
topic
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,说白了routingkey支持正则了,它约定:
routing key为一个句点号“.”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”;
binding key与routing key一样也是句点号“. ”分隔的字符串;
binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个);
以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey;
headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
消息流
(1)客户端连接到消息队列服务器,打开一个channel;
(2)客户端声明一个exchange,并设置相关属性;
(3)客户端声明一个queue,并设置相关属性;
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系;
(5)客户端投递消息到exchange;