问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

rocketmq 相同的message key会在同一个queue下吗

发布网友 发布时间:2022-04-11 02:40

我来回答

2个回答

懂视网 时间:2022-04-11 07:01

  • SendMessageRequestHeader定义了batch属性,用于标识是否是MessageBatch
  • processRequest

    rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

    public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    
     private List<ConsumeMessageHook> consumeMessageHookList;
    
     public SendMessageProcessor(final BrokerController brokerController) {
     super(brokerController);
     }
    
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx,
          RemotingCommand request) throws RemotingCommandException {
     SendMessageContext mqtraceContext;
     switch (request.getCode()) {
      case RequestCode.CONSUMER_SEND_MSG_BACK:
      return this.consumerSendMsgBack(ctx, request);
      default:
      SendMessageRequestHeader requestHeader = parseRequestHeader(request);
      if (requestHeader == null) {
       return null;
      }
    
      mqtraceContext = buildMsgContext(ctx, requestHeader);
      this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
    
      RemotingCommand response;
      if (requestHeader.isBatch()) {
       response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
      } else {
       response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
      }
    
      this.executeSendMessageHookAfter(response, mqtraceContext);
      return response;
     }
     }
    
     //......
    
    }
    
  • processRequest方法在判断requestHeader.isBatch()时会执行sendBatchMessage
  • sendBatchMessage

    rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

    public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    
     private List<ConsumeMessageHook> consumeMessageHookList;
    
     public SendMessageProcessor(final BrokerController brokerController) {
     super(brokerController);
     }
    
     //......
    
     private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
          final RemotingCommand request,
          final SendMessageContext sendMessageContext,
          final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    
     final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
     final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
    
     response.setOpaque(request.getOpaque());
    
     response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
     response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    
     log.debug("Receive SendMessage request command {}", request);
    
     final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
     if (this.brokerController.getMessageStore().now() < startTimstamp) {
      response.setCode(ResponseCode.SYSTEM_ERROR);
      response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
      return response;
     }
    
     response.setCode(-1);
     super.msgCheck(ctx, requestHeader, response);
     if (response.getCode() != -1) {
      return response;
     }
    
     int queueIdInt = requestHeader.getQueueId();
     TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    
     if (queueIdInt < 0) {
      queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
     }
    
     if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
      response.setCode(ResponseCode.MESSAGE_ILLEGAL);
      response.setRemark("message topic length too long " + requestHeader.getTopic().length());
      return response;
     }
    
     if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
      response.setCode(ResponseCode.MESSAGE_ILLEGAL);
      response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
      return response;
     }
     MessageExtBatch messageExtBatch = new MessageExtBatch();
     messageExtBatch.setTopic(requestHeader.getTopic());
     messageExtBatch.setQueueId(queueIdInt);
    
     int sysFlag = requestHeader.getSysFlag();
     if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
      sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
     }
     messageExtBatch.setSysFlag(sysFlag);
    
     messageExtBatch.setFlag(requestHeader.getFlag());
     MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
     messageExtBatch.setBody(request.getBody());
     messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
     messageExtBatch.setBornHost(ctx.channel().remoteAddress());
     messageExtBatch.setStoreHost(this.getStoreHost());
     messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
     String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
     MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
    
     PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
    
     return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
     }
    
     //......
    
    }
    
  • sendBatchMessage方法会执行msgCheck,之后构造messageExtBatch,然后执行brokerController.getMessageStore().putMessages(messageExtBatch),之后通过handlePutMessageResult方法处理PutMessageResult
  • MessageExtBatch

    rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java

    public class MessageExtBatch extends MessageExt {
    
     private static final long serialVersionUID = -2353110995348498537L;
    
     public ByteBuffer wrap() {
     assert getBody() != null;
     return ByteBuffer.wrap(getBody(), 0, getBody().length);
     }
    
     private ByteBuffer encodedBuff;
    
     public ByteBuffer getEncodedBuff() {
     return encodedBuff;
     }
    
     public void setEncodedBuff(ByteBuffer encodedBuff) {
     this.encodedBuff = encodedBuff;
     }
    }
    
  • MessageExtBatch继承了MessageExt,它提供了wrap方法,用于将body包装为ByteBuffer;它同时定义了encodedBuff,并提供了get、set方法
  • MessageExtBatchEncoder

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/CommitLog.java

     public static class MessageExtBatchEncoder {
     // Store the message content
     private final ByteBuffer msgBatchMemory;
     // The maximum length of the message
     private final int maxMessageSize;
    
     MessageExtBatchEncoder(final int size) {
      this.msgBatchMemory = ByteBuffer.allocateDirect(size);
      this.maxMessageSize = size;
     }
    
     public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
      msgBatchMemory.clear(); //not thread-safe
      int totalMsgLen = 0;
      ByteBuffer messagesByteBuff = messageExtBatch.wrap();
    
      int sysFlag = messageExtBatch.getSysFlag();
      int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
      int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
      ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
      ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
    
      while (messagesByteBuff.hasRemaining()) {
      // 1 TOTALSIZE
      messagesByteBuff.getInt();
      // 2 MAGICCODE
      messagesByteBuff.getInt();
      // 3 BODYCRC
      messagesByteBuff.getInt();
      // 4 FLAG
      int flag = messagesByteBuff.getInt();
      // 5 BODY
      int bodyLen = messagesByteBuff.getInt();
      int bodyPos = messagesByteBuff.position();
      int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
      messagesByteBuff.position(bodyPos + bodyLen);
      // 6 properties
      short propertiesLen = messagesByteBuff.getShort();
      int propertiesPos = messagesByteBuff.position();
      messagesByteBuff.position(propertiesPos + propertiesLen);
    
      final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    
      final int topicLength = topicData.length;
    
      final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen);
    
      // Exceeds the maximum message
      if (msgLen > this.maxMessageSize) {
       CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
       + ", maxMessageSize: " + this.maxMessageSize);
       throw new RuntimeException("message size exceeded");
      }
    
      totalMsgLen += msgLen;
      // Determines whether there is sufficient free space
      if (totalMsgLen > maxMessageSize) {
       throw new RuntimeException("message size exceeded");
      }
    
      // 1 TOTALSIZE
      this.msgBatchMemory.putInt(msgLen);
      // 2 MAGICCODE
      this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
      // 3 BODYCRC
      this.msgBatchMemory.putInt(bodyCrc);
      // 4 QUEUEID
      this.msgBatchMemory.putInt(messageExtBatch.getQueueId());
      // 5 FLAG
      this.msgBatchMemory.putInt(flag);
      // 6 QUEUEOFFSET
      this.msgBatchMemory.putLong(0);
      // 7 PHYSICALOFFSET
      this.msgBatchMemory.putLong(0);
      // 8 SYSFLAG
      this.msgBatchMemory.putInt(messageExtBatch.getSysFlag());
      // 9 BORNTIMESTAMP
      this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp());
      // 10 BORNHOST
      this.resetByteBuffer(bornHostHolder, bornHostLength);
      this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));
      // 11 STORETIMESTAMP
      this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp());
      // 12 STOREHOSTADDRESS
      this.resetByteBuffer(storeHostHolder, storeHostLength);
      this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
      // 13 RECONSUMETIMES
      this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes());
      // 14 Prepared Transaction Offset, batch does not support transaction
      this.msgBatchMemory.putLong(0);
      // 15 BODY
      this.msgBatchMemory.putInt(bodyLen);
      if (bodyLen > 0)
       this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);
      // 16 TOPIC
      this.msgBatchMemory.put((byte) topicLength);
      this.msgBatchMemory.put(topicData);
      // 17 PROPERTIES
      this.msgBatchMemory.putShort(propertiesLen);
      if (propertiesLen > 0)
       this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
      }
      msgBatchMemory.flip();
      return msgBatchMemory;
     }
    
     private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
      byteBuffer.flip();
      byteBuffer.limit(limit);
     }
    
     }
    
  • MessageExtBatchEncoder提供了encode方法,它首先通过messageExtBatch.wrap()得到messagesByteBuff,之后重新组装数据到msgBatchMemory
  • putMessages

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/CommitLog.java

    public class CommitLog {
    
    	//......
    
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
     messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
     AppendMessageResult result;
    
     StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
     final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
    
     if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
      return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
     }
     if (messageExtBatch.getDelayTimeLevel() > 0) {
      return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
     }
    
     InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
     if (bornSocketAddress.getAddress() instanceof Inet6Address) {
      messageExtBatch.setBornHostV6Flag();
     }
    
     InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
     if (storeSocketAddress.getAddress() instanceof Inet6Address) {
      messageExtBatch.setStoreHostAddressV6Flag();
     }
    
     long eclipsedTimeInLock = 0;
     MappedFile unlockMappedFile = null;
     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    
     //fine-grained lock instead of the coarse-grained
     MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
    
     messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
    
     //......
    
     }
    
     //......
    } 
    
  • putMessages方法会使用batchEncoder.encode(messageExtBatch)来设置messageExtBatch的encodedBuff
  • doAppend

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/CommitLog.java

    public class CommitLog {
    
    	//......
    
     class DefaultAppendMessageCallback implements AppendMessageCallback {
    
     public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
      final MessageExtBatch messageExtBatch) {
      byteBuffer.mark();
      //physical offset
      long wroteOffset = fileFromOffset + byteBuffer.position();
      // Record ConsumeQueue information
      keyBuilder.setLength(0);
      keyBuilder.append(messageExtBatch.getTopic());
      keyBuilder.append(‘-‘);
      keyBuilder.append(messageExtBatch.getQueueId());
      String key = keyBuilder.toString();
      Long queueOffset = CommitLog.this.topicQueueTable.get(key);
      if (null == queueOffset) {
      queueOffset = 0L;
      CommitLog.this.topicQueueTable.put(key, queueOffset);
      }
      long beginQueueOffset = queueOffset;
      int totalMsgLen = 0;
      int msgNum = 0;
      msgIdBuilder.setLength(0);
      final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
      ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
    
      int sysFlag = messageExtBatch.getSysFlag();
      int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
      ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
    
      //......
    
     }
     }
    
     //......
    } 
    
  • doAppend方法则读取messageExtBatch.getEncodedBuff()获取messagesByteBuff
  • 小结

    SendMessageRequestHeader定义了batch属性,用于标识是否是MessageBatch;processRequest方法在判断requestHeader.isBatch()时会执行sendBatchMessage;sendBatchMessage方法会执行msgCheck,之后构造messageExtBatch,然后执行brokerController.getMessageStore().putMessages(messageExtBatch),之后通过handlePutMessageResult方法处理PutMessageResult

    doc

  • SendMessageProcessor
  • 来源:http://www.1994july.club/seo/?p=1697

    聊聊rocketmq的sendBatchMessage

    标签:etop   setflag   seh   cut   cee   override   dex   eset   rand   

    热心网友 时间:2022-04-11 04:09

    Native层就是本地框架。 这些层大致如此区分: Java应用程序无需过多解释,基本可以理解为各个App,由Java语言实现。
    声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
    为什么我老是容易出汗,不管夏天还是冬天,只要一活动就出汗。这样要怎么... 一年四季长期爱出汗,一动就大汗淋淋 交通事故次要责任可以构成工伤认定吗 ...认定工伤九级,付次要责任那企业一次性伤残就业补助金也按_百度知... 交通事故次要责任工伤赔偿标准 才生下来的乌龟吃什么 大蒜四月份管理要点 HR要学什么 HR所需的法律 耳夹式耳机有哪些优点?蛇圣星环耳夹式耳机实测分享 盗窃罪司法解释包括哪些主要内容 我国最高法院的司法解释包括哪几种类型 立法解释,司法解释,行政解释 立法解释与司法解释? 司法解释有哪些作用 行政诉讼司法解释的内容有什么 新刑法司法解释的内容有哪些 此次最高法出台司法解释的主要内容有哪些? 司法解释和立法解释怎么区分和联系? 法律解释的一般方法包括什么? 司法解释是不是既包括最高人民*的解释,也包括最高人民*的解释? 司法解释的种类有那些? 填充墙梁底斜砌有哪几种砌法? 砌墙有那些砌筑方法,用途是什么? 砌墙技巧图解 砌墙不用粉涮有什么办法? 砌墙铺灰手法有几种 电脑已经设置了1分钟自动锁屏,但是时间到了却无法自动锁屏,求高手指点,设置没问题? PCA-6113P4R板卡驱动!!!帮帮小妹啊~~~ 有没有可以直接下载下来就是DVD格式电影或者电视剧的网站 心脏支架一年后复查需耍做那些检查最佳? 做心脏支架手术一年后,复查需做哪些检查呢? 心脏支架一年了,需要复查吗,如果需要该怎么复查 心脏支架术一年后要检查什么? 心脏支架手术一年后需要做什么检查 心脏放支架一年后用什麽办法复查最好? 做完心脏支架手术一年了要做些什么复查 心脏支架后一年后复查需要什么手续 放心脏支架一年后需要怎样复查 心脏支架后一年多了还须检查吗 心脏做支架一年后,有必要再做造影检查吗? 做了心脏支架一年后都是要复查什么 做了心脏支架手术一年后,复查要检查什么项 为啥我的玩游戏盒下载速度突然变成零 已经下到92.9了 我太难了? 求大神,gta5中文版,私我,我太难了 求大神分享我太难了(抖音版)-孤新阳_小奶瓶音乐百度云资源下载地址 那个,为什么华为不能进光遇,我太难了,算了,继续玩我的第5人格吧? 以前的账号过期了 我又退不出来 我已经试了很多次了 我用我的新号码也不行说是也是过期了 我太难了 我太难了。 我太难了!