发布网友 发布时间:2024-10-01 17:31
共1个回答
热心网友 时间:2024-10-17 20:18
场景服务是多节点部署的,配置由于很少改动又经常被查询,所以将配置数据放入本地缓存中,修改配置后,更新缓存,但是只能更新当前访问节点的缓存,无法修改其他节点的缓存,导致查询数据出现问题。
分析如何在其中一个节点更新缓存也同时更新其他节点的缓存呢?消息队列是否可以解决此问题呢?
解决使用Rabbit的广播模式,可以实现,使用交换机绑定队列,而队列在每一个服务节点启动后都生成一个唯一队列和交换机绑定,这样就实现了当发现消息给交换机,会有多个不同的队列对消息进行处理
实现创建配置,声明交换机,动态队列,绑定交换机队列,绑定监听
/***@author子羽*@Description广播模式清除本地缓存*@Date2021/8/27*/@ConfigurationpublicclassCacheRabbitConfig{publicstaticfinalStringMY_FANOUTEXCHANGE_NAME="local-cache-exchange";/***用UUID来生成一个queue名称,也可以用服务器IP端口作为queue名称*/publicstaticfinalStringMY_QUEUE_NAME=UUID.randomUUID().toString();@AutowiredRabbitTemplaterabbitTemplate;/***创建动态queue自动删除队列,不然会造成队列堆积*@return*/@BeanpublicQueuemyQueue(){returnnewQueue(MY_QUEUE_NAME,true,false,true);}/***创建Exchange*@return*/@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange(MY_FANOUTEXCHANGE_NAME,true,false);}/***绑定当前queue到Exchange*@return*/@BeanpublicBindingbindingExchangeMyQueue(){returnBindingBuilder.bind(myQueue()).to(fanoutExchange());}/***设置消息处理*@return*/@BeanpublicSimpleMessageListenerContainermqMessageContainer(ClearCacheListenerclearCacheListener){SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());container.setQueueNames(MY_QUEUE_NAME);container.setExposeListenerChannel(true);//设置每个消费者获取的最大的消息数量container.setPrefetchCount(1);//消费者个数container.setConcurrentConsumers(1);//设置确认模式为手工确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//消息处理类container.setMessageListener(clearCacheListener);returncontainer;}}设置监听实现
/***@author子羽*@Description监听删除缓存*@Date2021/8/27*/@ComponentpublicclassClearCacheListenerimplementsChannelAwareMessageListener{privatestaticfinalLoggerlog=LoggerFactory.getLogger(ClearCacheListener.class);@AutowiredprivateSysConfigServicesysConfigService;@OverridepublicvoidonMessage(Messagemessage,Channelchannel){try{log.info("接收删除系统配置缓存消息,correlationId:[{}]",newString(message.getMessageProperties().getCorrelationId()));//删除本地缓存sysConfigService.cleanLocal();channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){log.error("处理清除缓存消息失败",e);}}}建立消息生产者
/***@author子羽*@descript清除内外部多节点缓存*/@ComponentpublicclassClearCacheProducerextendsAbstractRabbitProducer{protectedstaticfinalLoggerlogger=LogManager.getCurrentClassLogger();publicClearCacheProducer(RabbitTemplaterabbitTemplate){super(rabbitTemplate,MQConstant.APP_ID);}publicvoidsendDelSystemCacheMessage(){CorrelationDatacorrelationData=getCorrelationData();logger.info("发送消息至缓存队列,correlationId:{}消息:[{}]",correlationData.getId(),"清除系统配置缓存");this.sendMsg("local-cache-exchange","","",correlationData);logger.info("发送消息成功correlationId:{}",correlationData.getId());}}调用清除节点缓存
/***<清空缓存>**@see[类#方法]*/publicvoidclean(){//先清除本节点缓存this.cache.clean();//清除内外部节点系统配置缓存clearCacheProducer.sendDelSystemCacheMessage();}结语整体实现还是比较简单的,主要的一个比较难想到的是,每个节点会创建出不同的队列,也就是不同节点会产生不同的结果,另外注意的是队列一定要动态队列。
returnnewQueue(MY_QUEUE_NAME,true,false,true);
```/***Constructanewqueue,givenaname,durability,exclusiveandauto-deleteflags.*@paramnamethenameofthequeue.*@paramdurabletrueifwearedeclaringadurablequeue(thequeuewillsurviveaserverrestart)*@paramexclusivetrueifwearedeclaringanexclusivequeue(thequeuewillonlybeusedbythedeclarer's*connection)*@paramautoDeletetrueiftheservershoulddeletethequeuewhenitisnolongerinuse*/publicQueue(Stringname,booleandurable,booleanexclusive,booleanautoDelete){this(name,durable,exclusive,autoDelete,null);}```durable代表是否持久化队列,如果持久化,那么服务重启队列中的数据还在exclusive排他队列,如果设定为true,那么这个队列仅对这个连接可见autoDelete自动删除,这个自动删除并不是执行一段时间字段删除,而是失去监听后自动删除,这次我设定就是自动删除,因为如果不设定自动删除,重启节点后,会生成一个新节点而原有节点也不会删除,会导致rabbitmq出现队列堆积。
作者:千云热心网友 时间:2024-10-17 20:16
场景服务是多节点部署的,配置由于很少改动又经常被查询,所以将配置数据放入本地缓存中,修改配置后,更新缓存,但是只能更新当前访问节点的缓存,无法修改其他节点的缓存,导致查询数据出现问题。
分析如何在其中一个节点更新缓存也同时更新其他节点的缓存呢?消息队列是否可以解决此问题呢?
解决使用Rabbit的广播模式,可以实现,使用交换机绑定队列,而队列在每一个服务节点启动后都生成一个唯一队列和交换机绑定,这样就实现了当发现消息给交换机,会有多个不同的队列对消息进行处理
实现创建配置,声明交换机,动态队列,绑定交换机队列,绑定监听
/***@author子羽*@Description广播模式清除本地缓存*@Date2021/8/27*/@ConfigurationpublicclassCacheRabbitConfig{publicstaticfinalStringMY_FANOUTEXCHANGE_NAME="local-cache-exchange";/***用UUID来生成一个queue名称,也可以用服务器IP端口作为queue名称*/publicstaticfinalStringMY_QUEUE_NAME=UUID.randomUUID().toString();@AutowiredRabbitTemplaterabbitTemplate;/***创建动态queue自动删除队列,不然会造成队列堆积*@return*/@BeanpublicQueuemyQueue(){returnnewQueue(MY_QUEUE_NAME,true,false,true);}/***创建Exchange*@return*/@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange(MY_FANOUTEXCHANGE_NAME,true,false);}/***绑定当前queue到Exchange*@return*/@BeanpublicBindingbindingExchangeMyQueue(){returnBindingBuilder.bind(myQueue()).to(fanoutExchange());}/***设置消息处理*@return*/@BeanpublicSimpleMessageListenerContainermqMessageContainer(ClearCacheListenerclearCacheListener){SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());container.setQueueNames(MY_QUEUE_NAME);container.setExposeListenerChannel(true);//设置每个消费者获取的最大的消息数量container.setPrefetchCount(1);//消费者个数container.setConcurrentConsumers(1);//设置确认模式为手工确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//消息处理类container.setMessageListener(clearCacheListener);returncontainer;}}设置监听实现
/***@author子羽*@Description监听删除缓存*@Date2021/8/27*/@ComponentpublicclassClearCacheListenerimplementsChannelAwareMessageListener{privatestaticfinalLoggerlog=LoggerFactory.getLogger(ClearCacheListener.class);@AutowiredprivateSysConfigServicesysConfigService;@OverridepublicvoidonMessage(Messagemessage,Channelchannel){try{log.info("接收删除系统配置缓存消息,correlationId:[{}]",newString(message.getMessageProperties().getCorrelationId()));//删除本地缓存sysConfigService.cleanLocal();channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){log.error("处理清除缓存消息失败",e);}}}建立消息生产者
/***@author子羽*@descript清除内外部多节点缓存*/@ComponentpublicclassClearCacheProducerextendsAbstractRabbitProducer{protectedstaticfinalLoggerlogger=LogManager.getCurrentClassLogger();publicClearCacheProducer(RabbitTemplaterabbitTemplate){super(rabbitTemplate,MQConstant.APP_ID);}publicvoidsendDelSystemCacheMessage(){CorrelationDatacorrelationData=getCorrelationData();logger.info("发送消息至缓存队列,correlationId:{}消息:[{}]",correlationData.getId(),"清除系统配置缓存");this.sendMsg("local-cache-exchange","","",correlationData);logger.info("发送消息成功correlationId:{}",correlationData.getId());}}调用清除节点缓存
/***<清空缓存>**@see[类#方法]*/publicvoidclean(){//先清除本节点缓存this.cache.clean();//清除内外部节点系统配置缓存clearCacheProducer.sendDelSystemCacheMessage();}结语整体实现还是比较简单的,主要的一个比较难想到的是,每个节点会创建出不同的队列,也就是不同节点会产生不同的结果,另外注意的是队列一定要动态队列。
returnnewQueue(MY_QUEUE_NAME,true,false,true);
```/***Constructanewqueue,givenaname,durability,exclusiveandauto-deleteflags.*@paramnamethenameofthequeue.*@paramdurabletrueifwearedeclaringadurablequeue(thequeuewillsurviveaserverrestart)*@paramexclusivetrueifwearedeclaringanexclusivequeue(thequeuewillonlybeusedbythedeclarer's*connection)*@paramautoDeletetrueiftheservershoulddeletethequeuewhenitisnolongerinuse*/publicQueue(Stringname,booleandurable,booleanexclusive,booleanautoDelete){this(name,durable,exclusive,autoDelete,null);}```durable代表是否持久化队列,如果持久化,那么服务重启队列中的数据还在exclusive排他队列,如果设定为true,那么这个队列仅对这个连接可见autoDelete自动删除,这个自动删除并不是执行一段时间字段删除,而是失去监听后自动删除,这次我设定就是自动删除,因为如果不设定自动删除,重启节点后,会生成一个新节点而原有节点也不会删除,会导致rabbitmq出现队列堆积。
作者:千云