发布网友 发布时间:2022-09-12 06:09
共1个回答
热心网友 时间:2024-06-30 05:39
RabbitMQ是用Erlang语言编写的分布式消息中间件,常常用在大型网站中作为消息队列来使用,主要目的是各个子系统之间的解耦和异步处理。消息中间件的基本模型是典型的生产者-消费者模型,生产者发送消息到消息队列,消费者监听消息队列,收到消息后消费处理。
在使用RabbitMQ做消息分发时,主要有三个概念要注意:Exchange,RoutingKey,Queue。
Exchange可以理解为交换器,RoutingKey可以理解为路由,Queue作为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:
(1)Exchange为fanout时,生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用;
(2)Exchange为topic时,生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息;
(3)direct类型的Exchange是最直接最简单的,生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(通常如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)
消费端yml配置:
在消费端,配置prefectch和concurrency参数便可以实现消费端mq并发处理消息,那么这两个参数具有有什么含义呢?
prefetch是每次从一次性从broker里面取的待消费的消息的个数。
每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中进行消费,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果ack模式为none,则忽略。
prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。
不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。
对于低容量消息和多个消费者的情况(也包括单listener容器的concurrency配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1。
如果要保证消息的可靠不丢失,当prefetch大于1时,可能会出现因为服务宕机引起的数据丢失,故建议将prefetch=1。
concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数。在上面的yml配置中,concurrency=1,即每个Listener容器将开启一个线程去处理消息。在2.0以后的版本中,可以在注解中配置该参数:
在服务启动后,可以发现在Listener容器中产生了两个线程去消费queue。如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。
若一个消费者配置prefetch=10,concurrency=2,会有两个消费者(或是线程)同时监听Queue,但是注意这里的消息只要有被一个消费者消费掉就会自动ack,另外一个消费者就不会再获取到此消息,Prefetch Count为配置设置的值10,意味着每个消费者每次会预取10个消息准备消费(注意不是两个消费者去共享内存中抓取的消息)。
每个消费者对应的listener有个Exclusive参数,默认为false, 如果设置为true,concurrency就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。
前面说过,设置并发的时候,要考虑具体的业务场景,对那种对消息的顺序有苛刻要求的场景不适合并发消费,而对于其他场景,比如用户注册后给用户发个提示短信,是不太在意哪个消息先被消费,哪个消息后被消费,因为每个消息是相对独立的,后注册的用户先收到短信也并没有太大影响。
设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个RabbitMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrency和prefetch值。