发布网友 发布时间:2024-10-01 17:31
共1个回答
热心网友 时间:2024-10-17 20:14
RocketMQBinder集成消息订阅AbstractMessageChannelBinder类中提供了创建MessageProducer的协议,在初始化Binder的时候加载createConsumerEndpoint方法。
RocketMQMessageChannelBinder完成RocketMQInboundChannelAdapter的创建和初始化。
RocketMQMessageChannelBinder的createConsumerEndpoint方法:
@OverrideprotectedMessageProducercreateConsumerEndpoint(ConsumerDestinationdestination,Stringgroup,ExtendedConsumerProperties<RocketMQConsumerProperties>consumerProperties)throwsException{if(group==null||"".equals(group)){thrownewRuntimeException("'groupmustbeconfiguredforchannel"+destination.getName());}RocketMQListenerBindingContainerlistenerContainer=newRocketMQListenerBindingContainer(consumerProperties,rocketBinderConfigurationProperties,this);listenerContainer.setConsumerGroup(group);listenerContainer.setTopic(destination.getName());listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());listenerContainer.setSuspendCurrentQueueTimeMillis(consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis());listenerContainer.setDelayLevelWhenNextConsume(consumerProperties.getExtension().getDelayLevelWhenNextConsume());listenerContainer.setNameServer(rocketBinderConfigurationProperties.getNameServer());listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));RocketMQInboundChannelAdapterrocketInboundChannelAdapter=newRocketMQInboundChannelAdapter(listenerContainer,consumerProperties,instrumentationManager);topicInUse.put(destination.getName(),group);ErrorInfrastructureerrorInfrastructure=registerErrorInfrastructure(destination,group,consumerProperties);if(consumerProperties.getMaxAttempts()>1){rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());}else{rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());}returnrocketInboundChannelAdapter;}RocketMQInboundChannelAdapter是适配器,需要适配SpringFramework的重试和回调机制,用来订阅消息和转化消息格式。RocketMQListenerBindingContainer是对RocketMQ客户端API的封装,适配器中持有它的对象。
RocketMQ提供两种消费模式:顺序消费和并发消费。RocketMQ客户端API中顺序消费的默认监听器是DefaultMessageListenerOrderly,并发消费的默认监听器是DefaultMessageListenerConcurrently类,无论哪个消费模式,监听器收到的消息都会回调RocketMQListener
RocketMQInboundChannelAdapter中创建和初始化RocketMQListener的实现类
RocketMQInboundChannelAdapter
@OverrideprotectedvoidonInit(){if(consumerProperties==null||!consumerProperties.getExtension().getEnabled()){return;}super.onInit();if(this.retryTemplate!=null){Assert.state(getErrorChannel()==null,"Cannothavean'errorChannel'propertywhena'RetryTemplate'is"+"provided;usean'ErrorMessageSendingRecoverer'inthe'recoveryCallback'propertyto"+"sendanerrormessagewhenretriesareexhausted");}BindingRocketMQListenerlistener=newBindingRocketMQListener();rocketMQListenerContainer.setRocketMQListener(listener);if(retryTemplate!=null){this.retryTemplate.registerListener(listener);}try{rocketMQListenerContainer.afterPropertiesSet();}catch(Exceptione){log.error("rocketMQListenerContaineriniterror:"+e.getMessage(),e);thrownewIllegalArgumentException("rocketMQListenerContaineriniterror:"+e.getMessage(),e);}instrumentationManager.addHealthInstrumentation(newInstrumentation(rocketMQListenerContainer.getTopic()+rocketMQListenerContainer.getConsumerGroup()));}protectedclassBindingRocketMQListenerimplementsRocketMQListener<Message>,RetryListener{@OverridepublicvoidonMessage(Messagemessage){booleanenableRetry=RocketMQInboundChannelAdapter.this.retryTemplate!=null;if(enableRetry){RocketMQInboundChannelAdapter.this.retryTemplate.execute(context->{RocketMQInboundChannelAdapter.this.sendMessage(message);returnnull;},(RecoveryCallback<Object>)RocketMQInboundChannelAdapter.this.recoveryCallback);}else{RocketMQInboundChannelAdapter.this.sendMessage(message);}}@Overridepublic<T,EextendsThrowable>booleanopen(RetryContextcontext,RetryCallback<T,E>callback){returntrue;}@Overridepublic<T,EextendsThrowable>voidclose(RetryContextcontext,RetryCallback<T,E>callback,Throwablethrowable){}@Overridepublic<T,EextendsThrowable>voidonError(RetryContextcontext,RetryCallback<T,E>callback,Throwablethrowable){}}DefaultMessageListenerOrderly收到RocketMQ消息后,先回调BindingRocketMQListener的onMessage方法,再调用RocketMQInboundChannelAdapter父类的sendMessage方法将消息发送到DirectChannel
SpringCloudStream的接收消息和发送消息的消息模型是一致的,Binder中接收的消息先发送到MessageChannel,由订阅的MessageChannel通过Dispatcher转发到对应的MessageHandler进行处理。
RocketMQInboundChannelAdapter的父类MessageProducerSupport的getOutputChannel()得到的MessageChannel是在初始化RocketMQBinder时传入的DirectChannel
MessageProducerSupport的getOutputChannel方法:
@OverridepublicMessageChannelgetOutputChannel(){if(this.outputChannelName!=null){synchronized(this){if(this.outputChannelName!=null){this.outputChannel=getChannelResolver().resolveDestination(this.outputChannelName);this.outputChannelName=null;}}}returnthis.outputChannel;}MessagingTemplate继承GenericMessagingTemplate类,实际执行doSend()方法发送消息
MessageChannel的实例是DirectChannel对象,复用前面消息发送流程,通过消息分发类MessageDispatcher把消息分发给MessageHandler
DirectChannel对应的消息处理器是StreamListenerMessageHandler
publicclassStreamListenerMessageHandlerextendsAbstractReplyProducingMessageHandler{privatefinalInvocableHandlerMethodinvocableHandlerMethod;privatefinalbooleancopyHeaders;StreamListenerMessageHandler(InvocableHandlerMethodinvocableHandlerMethod,booleancopyHeaders,String[]notPropagatedHeaders){super();this.invocableHandlerMethod=invocableHandlerMethod;this.copyHeaders=copyHeaders;this.setNotPropagatedHeaders(notPropagatedHeaders);}@OverrideprotectedbooleanshouldCopyRequestHeaders(){returnthis.copyHeaders;}publicbooleanisVoid(){returnthis.invocableHandlerMethod.isVoid();}@OverrideprotectedObjecthandleRequestMessage(Message<?>requestMessage){try{returnthis.invocableHandlerMethod.invoke(requestMessage);}catch(Exceptione){if(einstanceofMessagingException){throw(MessagingException)e;}else{thrownewMessagingException(requestMessage,"Exceptionthrownwhileinvoking"+this.invocableHandlerMethod.getShortLogMessage(),e);}}}}InvocableHandlerMethod使用java反射机制完成回调,StreamListenerMessageHandler与@
StreamListenerAnnotationBeanPostProcessor的afterSingletonsInstantiated方法:
@OverridepublicfinalvoidafterSingletonsInstantiated(){this.injectAndPostProcessDependencies();EvaluationContextevaluationContext=IntegrationContextUtils.getEvaluationContext(this.applicationContext.getBeanFactory());for(Map.Entry<String,List<StreamListenerHandlerMethodMapping>>mappedBindingEntry:this.mappedListenerMethods.entrySet()){ArrayList<DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper>handlers;handlers=newArrayList<>();for(StreamListenerHandlerMethodMappingmapping:mappedBindingEntry.getValue()){finalInvocableHandlerMethodinvocableHandlerMethod=this.messageHandlerMethodFactory.createInvocableHandlerMethod(mapping.getTargetBean(),checkProxy(mapping.getMethod(),mapping.getTargetBean()));StreamListenerMessageHandlerstreamListenerMessageHandler=newStreamListenerMessageHandler(invocableHandlerMethod,resolveExpressionAsBoolean(mapping.getCopyHeaders(),"copyHeaders"),this.springIntegrationProperties.getMessageHandlerNotPropagatedHeaders());streamListenerMessageHandler.setApplicationContext(this.applicationContext);streamListenerMessageHandler.setBeanFactory(this.applicationContext.getBeanFactory());if(StringUtils.hasText(mapping.getDefaultOutputChannel())){streamListenerMessageHandler.setOutputChannelName(mapping.getDefaultOutputChannel());}streamListenerMessageHandler.afterPropertiesSet();if(StringUtils.hasText(mapping.getCondition())){StringconditionAsString=resolveExpressionAsString(mapping.getCondition(),"condition");Expressioncondition=SPEL_EXPRESSION_PARSER.parseExpression(conditionAsString);handlers.add(newDispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(condition,streamListenerMessageHandler));}else{handlers.add(newDispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(null,streamListenerMessageHandler));}}if(handlers.size()>1){for(DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapperhandler:handlers){Assert.isTrue(handler.isVoid(),StreamListenerErrorMessages.MULTIPLE_VALUE_RETURNING_METHODS);}}AbstractReplyProducingMessageHandlerhandler;if(handlers.size()>1||handlers.get(0).getCondition()!=null){handler=newDispatchingStreamListenerMessageHandler(handlers,evaluationContext);}else{handler=handlers.get(0).getStreamListenerMessageHandler();}handler.setApplicationContext(this.applicationContext);handler.setChannelResolver(this.binderAwareChannelResolver);handler.afterPropertiesSet();this.applicationContext.getBeanFactory().registerSingleton(handler.getClass().getSimpleName()+handler.hashCode(),handler);this.applicationContext.getBean(mappedBindingEntry.getKey(),SubscribableChannel.class).subscribe(handler);}this.mappedListenerMethods.clear();}在Spring容器管理的所有单例对象初始化完成之后,遍历StreamListenerHandlerMethodMapping,进行InvocableHandlerMethod和StreamListenerMessageHandler的创建和初始化
StreamListenerHandlerMethodMapping保存了StreamListener和InvocableHandlerMethod的映射关系,映射关系的创建是在StreamListenerAnnotationBeanPostProcessor的postProcessAfterInitialization()方法
@OverridepublicfinalObjectpostProcessAfterInitialization(Objectbean,finalStringbeanName)throwsBeansException{Class<?>targetClass=AopUtils.isAopProxy(bean)?AopUtils.getTargetClass(bean):bean.getClass();Method[]uniqueDeclaredMethods=ReflectionUtils.getUniqueDeclaredMethods(targetClass);for(Methodmethod:uniqueDeclaredMethods){StreamListenerstreamListener=AnnotatedElementUtils.findMergedAnnotation(method,StreamListener.class);if(streamListener!=null&&!method.isBridge()){this.streamListenerCallbacks.add(()->{Assert.isTrue(method.g热心网友 时间:2024-10-17 20:12
RocketMQBinder集成消息订阅AbstractMessageChannelBinder类中提供了创建MessageProducer的协议,在初始化Binder的时候加载createConsumerEndpoint方法。
RocketMQMessageChannelBinder完成RocketMQInboundChannelAdapter的创建和初始化。
RocketMQMessageChannelBinder的createConsumerEndpoint方法:
@OverrideprotectedMessageProducercreateConsumerEndpoint(ConsumerDestinationdestination,Stringgroup,ExtendedConsumerProperties<RocketMQConsumerProperties>consumerProperties)throwsException{if(group==null||"".equals(group)){thrownewRuntimeException("'groupmustbeconfiguredforchannel"+destination.getName());}RocketMQListenerBindingContainerlistenerContainer=newRocketMQListenerBindingContainer(consumerProperties,rocketBinderConfigurationProperties,this);listenerContainer.setConsumerGroup(group);listenerContainer.setTopic(destination.getName());listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());listenerContainer.setSuspendCurrentQueueTimeMillis(consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis());listenerContainer.setDelayLevelWhenNextConsume(consumerProperties.getExtension().getDelayLevelWhenNextConsume());listenerContainer.setNameServer(rocketBinderConfigurationProperties.getNameServer());listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));RocketMQInboundChannelAdapterrocketInboundChannelAdapter=newRocketMQInboundChannelAdapter(listenerContainer,consumerProperties,instrumentationManager);topicInUse.put(destination.getName(),group);ErrorInfrastructureerrorInfrastructure=registerErrorInfrastructure(destination,group,consumerProperties);if(consumerProperties.getMaxAttempts()>1){rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());}else{rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());}returnrocketInboundChannelAdapter;}RocketMQInboundChannelAdapter是适配器,需要适配SpringFramework的重试和回调机制,用来订阅消息和转化消息格式。RocketMQListenerBindingContainer是对RocketMQ客户端API的封装,适配器中持有它的对象。
RocketMQ提供两种消费模式:顺序消费和并发消费。RocketMQ客户端API中顺序消费的默认监听器是DefaultMessageListenerOrderly,并发消费的默认监听器是DefaultMessageListenerConcurrently类,无论哪个消费模式,监听器收到的消息都会回调RocketMQListener
RocketMQInboundChannelAdapter中创建和初始化RocketMQListener的实现类
RocketMQInboundChannelAdapter
@OverrideprotectedvoidonInit(){if(consumerProperties==null||!consumerProperties.getExtension().getEnabled()){return;}super.onInit();if(this.retryTemplate!=null){Assert.state(getErrorChannel()==null,"Cannothavean'errorChannel'propertywhena'RetryTemplate'is"+"provided;usean'ErrorMessageSendingRecoverer'inthe'recoveryCallback'propertyto"+"sendanerrormessagewhenretriesareexhausted");}BindingRocketMQListenerlistener=newBindingRocketMQListener();rocketMQListenerContainer.setRocketMQListener(listener);if(retryTemplate!=null){this.retryTemplate.registerListener(listener);}try{rocketMQListenerContainer.afterPropertiesSet();}catch(Exceptione){log.error("rocketMQListenerContaineriniterror:"+e.getMessage(),e);thrownewIllegalArgumentException("rocketMQListenerContaineriniterror:"+e.getMessage(),e);}instrumentationManager.addHealthInstrumentation(newInstrumentation(rocketMQListenerContainer.getTopic()+rocketMQListenerContainer.getConsumerGroup()));}protectedclassBindingRocketMQListenerimplementsRocketMQListener<Message>,RetryListener{@OverridepublicvoidonMessage(Messagemessage){booleanenableRetry=RocketMQInboundChannelAdapter.this.retryTemplate!=null;if(enableRetry){RocketMQInboundChannelAdapter.this.retryTemplate.execute(context->{RocketMQInboundChannelAdapter.this.sendMessage(message);returnnull;},(RecoveryCallback<Object>)RocketMQInboundChannelAdapter.this.recoveryCallback);}else{RocketMQInboundChannelAdapter.this.sendMessage(message);}}@Overridepublic<T,EextendsThrowable>booleanopen(RetryContextcontext,RetryCallback<T,E>callback){returntrue;}@Overridepublic<T,EextendsThrowable>voidclose(RetryContextcontext,RetryCallback<T,E>callback,Throwablethrowable){}@Overridepublic<T,EextendsThrowable>voidonError(RetryContextcontext,RetryCallback<T,E>callback,Throwablethrowable){}}DefaultMessageListenerOrderly收到RocketMQ消息后,先回调BindingRocketMQListener的onMessage方法,再调用RocketMQInboundChannelAdapter父类的sendMessage方法将消息发送到DirectChannel
SpringCloudStream的接收消息和发送消息的消息模型是一致的,Binder中接收的消息先发送到MessageChannel,由订阅的MessageChannel通过Dispatcher转发到对应的MessageHandler进行处理。
RocketMQInboundChannelAdapter的父类MessageProducerSupport的getOutputChannel()得到的MessageChannel是在初始化RocketMQBinder时传入的DirectChannel
MessageProducerSupport的getOutputChannel方法:
@OverridepublicMessageChannelgetOutputChannel(){if(this.outputChannelName!=null){synchronized(this){if(this.outputChannelName!=null){this.outputChannel=getChannelResolver().resolveDestination(this.outputChannelName);this.outputChannelName=null;}}}returnthis.outputChannel;}MessagingTemplate继承GenericMessagingTemplate类,实际执行doSend()方法发送消息
MessageChannel的实例是DirectChannel对象,复用前面消息发送流程,通过消息分发类MessageDispatcher把消息分发给MessageHandler
DirectChannel对应的消息处理器是StreamListenerMessageHandler
publicclassStreamListenerMessageHandlerextendsAbstractReplyProducingMessageHandler{privatefinalInvocableHandlerMethodinvocableHandlerMethod;privatefinalbooleancopyHeaders;StreamListenerMessageHandler(InvocableHandlerMethodinvocableHandlerMethod,booleancopyHeaders,String[]notPropagatedHeaders){super();this.invocableHandlerMethod=invocableHandlerMethod;this.copyHeaders=copyHeaders;this.setNotPropagatedHeaders(notPropagatedHeaders);}@OverrideprotectedbooleanshouldCopyRequestHeaders(){returnthis.copyHeaders;}publicbooleanisVoid(){returnthis.invocableHandlerMethod.isVoid();}@OverrideprotectedObjecthandleRequestMessage(Message<?>requestMessage){try{returnthis.invocableHandlerMethod.invoke(requestMessage);}catch(Exceptione){if(einstanceofMessagingException){throw(MessagingException)e;}else{thrownewMessagingException(requestMessage,"Exceptionthrownwhileinvoking"+this.invocableHandlerMethod.getShortLogMessage(),e);}}}}InvocableHandlerMethod使用java反射机制完成回调,StreamListenerMessageHandler与@
StreamListenerAnnotationBeanPostProcessor的afterSingletonsInstantiated方法:
@OverridepublicfinalvoidafterSingletonsInstantiated(){this.injectAndPostProcessDependencies();EvaluationContextevaluationContext=IntegrationContextUtils.getEvaluationContext(this.applicationContext.getBeanFactory());for(Map.Entry<String,List<StreamListenerHandlerMethodMapping>>mappedBindingEntry:this.mappedListenerMethods.entrySet()){ArrayList<DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper>handlers;handlers=newArrayList<>();for(StreamListenerHandlerMethodMappingmapping:mappedBindingEntry.getValue()){finalInvocableHandlerMethodinvocableHandlerMethod=this.messageHandlerMethodFactory.createInvocableHandlerMethod(mapping.getTargetBean(),checkProxy(mapping.getMethod(),mapping.getTargetBean()));StreamListenerMessageHandlerstreamListenerMessageHandler=newStreamListenerMessageHandler(invocableHandlerMethod,resolveExpressionAsBoolean(mapping.getCopyHeaders(),"copyHeaders"),this.springIntegrationProperties.getMessageHandlerNotPropagatedHeaders());streamListenerMessageHandler.setApplicationContext(this.applicationContext);streamListenerMessageHandler.setBeanFactory(this.applicationContext.getBeanFactory());if(StringUtils.hasText(mapping.getDefaultOutputChannel())){streamListenerMessageHandler.setOutputChannelName(mapping.getDefaultOutputChannel());}streamListenerMessageHandler.afterPropertiesSet();if(StringUtils.hasText(mapping.getCondition())){StringconditionAsString=resolveExpressionAsString(mapping.getCondition(),"condition");Expressioncondition=SPEL_EXPRESSION_PARSER.parseExpression(conditionAsString);handlers.add(newDispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(condition,streamListenerMessageHandler));}else{handlers.add(newDispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(null,streamListenerMessageHandler));}}if(handlers.size()>1){for(DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapperhandler:handlers){Assert.isTrue(handler.isVoid(),StreamListenerErrorMessages.MULTIPLE_VALUE_RETURNING_METHODS);}}AbstractReplyProducingMessageHandlerhandler;if(handlers.size()>1||handlers.get(0).getCondition()!=null){handler=newDispatchingStreamListenerMessageHandler(handlers,evaluationContext);}else{handler=handlers.get(0).getStreamListenerMessageHandler();}handler.setApplicationContext(this.applicationContext);handler.setChannelResolver(this.binderAwareChannelResolver);handler.afterPropertiesSet();this.applicationContext.getBeanFactory().registerSingleton(handler.getClass().getSimpleName()+handler.hashCode(),handler);this.applicationContext.getBean(mappedBindingEntry.getKey(),SubscribableChannel.class).subscribe(handler);}this.mappedListenerMethods.clear();}在Spring容器管理的所有单例对象初始化完成之后,遍历StreamListenerHandlerMethodMapping,进行InvocableHandlerMethod和StreamListenerMessageHandler的创建和初始化
StreamListenerHandlerMethodMapping保存了StreamListener和InvocableHandlerMethod的映射关系,映射关系的创建是在StreamListenerAnnotationBeanPostProcessor的postProcessAfterInitialization()方法
@OverridepublicfinalObjectpostProcessAfterInitialization(Objectbean,finalStringbeanName)throwsBeansException{Class<?>targetClass=AopUtils.isAopProxy(bean)?AopUtils.getTargetClass(bean):bean.getClass();Method[]uniqueDeclaredMethods=ReflectionUtils.getUniqueDeclaredMethods(targetClass);for(Methodmethod:uniqueDeclaredMethods){StreamListenerstreamListener=AnnotatedElementUtils.findMergedAnnotation(method,StreamListener.class);if(streamListener!=null&&!method.isBridge()){this.streamListenerCallbacks.add(()->{Assert.isTrue(method.g