(1).叙述

前面说过:DirectChannel发委托给:UnicastingDispatcher.dispatch进行请求的分发.

(2).UnicastingDispatcher

public class UnicastingDispatcher extends AbstractDispatcher {
    // ************************************************
    // 1.分发消息
    // ************************************************
    public final boolean dispatch(final Message<?> message) {
        // 如果有配置执行器,就交给执行器进行管理
        if (this.executor != null) { // false
            Runnable task = createMessageHandlingTask(message);
            this.executor.execute(task);
            return true;
        }// end if
        
       
       return this.doDispatch(message);
	}// end dispatch    
 
    private boolean doDispatch(Message<?> message) {
        // ************************************************
        // 2. 委托给父类,判断消息是否需要优化分发
        // ************************************************
        if (tryOptimizedDispatch(message)) {
            return true;
        }
        
        boolean success = false;
        Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
        if (!handlerIterator.hasNext()) {
            throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
        }
        
        List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
        while (!success && handlerIterator.hasNext()) {
            MessageHandler handler = handlerIterator.next();
            try {
                handler.handleMessage(message);
                success = true; // we have a winner.
            } catch (Exception e) {
                RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, () -> "Dispatcher failed to deliver Message", e);
                exceptions.add(runtimeException);
                this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
            } //end catch
        }
        return success;
    }// end doDispatch
 
}

(3).AbstractDispatcher

public abstract class AbstractDispatcher implements MessageDispatcher {
    // ************************************************************
    // 3. 优化消息分发
    // ************************************************************
    protected boolean tryOptimizedDispatch(Message<?> message) {
        MessageHandler handler = this.theOneHandler;
        // org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler
        if (handler != null) {
            try {
                // 委托给:org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler
                // 进行消息处理
                handler.handleMessage(message);
                return true;
            } catch (Exception e) {
                throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, () -> "Dispatcher failed to deliver Message", e);
            }
        } //end if
        return false;
	} //end tryOptimizedDispatch
}

(4).AbstractMessageHandler

public abstract class AbstractMessageHandler extends IntegrationObjectSupport
		implements MessageHandler, MessageHandlerMetrics, ConfigurableMetricsAware<AbstractMessageHandlerMetrics>,
		TrackableComponent, Orderable, CoreSubscriber<Message<?>> {

    // **************************************************
    //  4. 处理消息
    // **************************************************
    public void handleMessage(Message<?> message) {
		Assert.notNull(message, "Message must not be null");
		Assert.notNull(message.getPayload(), "Message payload must not be null"); //NOSONAR - false positive
		if (this.loggingEnabled && this.logger.isDebugEnabled()) {
			this.logger.debug(this + " received message: " + message);
		}
		MetricsContext start = null;
		boolean countsEnabled = this.countsEnabled;
		AbstractMessageHandlerMetrics handlerMetrics = this.handlerMetrics;
		SampleFacade sample = null;
		if (countsEnabled && this.metricsCaptor != null) { // false
			sample = this.metricsCaptor.start();
		}
  
		try {
			if (this.shouldTrack) { // false
				message = MessageHistory.write(message, this, getMessageBuilderFactory());
			}
   
			if (countsEnabled) { // false
				start = handlerMetrics.beforeHandle();
				handleMessageInternal(message);
				if (sample != null) {
					sample.stop(sendTimer());
				}
				handlerMetrics.afterHandle(start, true);
			} else {
                           // *********************************************
                           // 5. 委托给子类处理
                           // AbstractMessageChannelBinder$SendingHandler
                           // *********************************************
				handleMessageInternal(message);
			}
		} catch (Exception e) {
			if (sample != null) {
				sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
			}
			if (countsEnabled) {
				handlerMetrics.afterHandle(start, false);
			}
			throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message,
					() -> "error occurred in message handler [" + this + "]", e);
		}
    } //end 
}

(5).RocketMQMessageChannelBinder$SendingHandler

public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>>
		extends AbstractBinder<MessageChannel, C, P>
		implements PollableConsumerBinder<MessageHandler, C>, ApplicationEventPublisherAware {


    private final class SendingHandler extends AbstractMessageHandler implements Lifecycle {
        
        protected void handleMessageInternal(Message<?> message) throws Exception {
            Message<?> messageToSend = (this.useNativeEncoding) ? message : serializeAndEmbedHeadersIfApplicable(message);
            // **************************************************************
            // **************************************************************
            // 6. 委托给:
            // com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler
            // 处理消息
            // **************************************************************
            // **************************************************************
            this.delegate.handleMessage(messageToSend);
        } //end handleMessageInternal
    } //end SendingHandler
    
    private Message<?> serializeAndEmbedHeadersIfApplicable(Message<?> message) throws Exception {
        // 拷贝message到MessageValues
        MessageValues transformed = serializePayloadIfNecessary(message);
        Object payload;
        if (this.embedHeaders) { // false
            Object contentType = transformed.get(MessageHeaders.CONTENT_TYPE);
            if (contentType != null) {
                transformed.put(MessageHeaders.CONTENT_TYPE, contentType.toString());
            }
            
            Object originalContentType = transformed.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE);
            if (originalContentType != null) {
                transformed.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, originalContentType.toString());
            }
            
            payload = EmbeddedHeaderUtils.embedHeaders(transformed, this.embeddedHeaders);
        } else {
            payload = transformed.getPayload();
        }
        // 拷贝生产一个新的Message
        return getMessageBuilderFactory().withPayload(payload).copyHeaders(transformed.getHeaders()).build();
    }// end serializeAndEmbedHeadersIfApplicable
                  
}

(6).RocketMQMessageHandler

public class RocketMQMessageHandler 
       extends AbstractMessageHandler 
       implements Lifecycle {
    
    // 处理消息
    protected void handleMessageInternal(org.springframework.messaging.Message<?> message) {
        try {
			
             Map<String, String> jsonHeaders = headerMapper.fromHeaders(message.getHeaders());
            message = org.springframework.messaging.support.MessageBuilder.fromMessage(message).copyHeaders(jsonHeaders).build();

            final StringBuilder topicWithTags = new StringBuilder(destination);
            String tags = Optional.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("").toString();
            if (!StringUtils.isEmpty(tags)) {
                topicWithTags.append(":").append(tags);
            }

            SendResult sendRes = null;
            // 是否为事务消息
            if (transactional) {
                    sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,topicWithTags.toString(), message, message.getHeaders().get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
                    log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
            } else {
                    int delayLevel = 0;
                    try {
                           // 延迟级别
                            Object delayLevelObj = message.getHeaders().getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
                    	if (delayLevelObj instanceof Number) {
                    		delayLevel = ((Number) delayLevelObj).intValue();
                    	}
                    	else if (delayLevelObj instanceof String) {
                    		delayLevel = Integer.parseInt((String) delayLevelObj);
                    	}
                    } catch (Exception e) {
			}
   
                   // 
			boolean needSelectQueue = message.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER);
			if (sync) {
                    	if (needSelectQueue) {
                    		sendRes = rocketMQTemplate.syncSendOrderly(
                    				topicWithTags.toString(), message, "",
                    				rocketMQTemplate.getProducer().getSendMsgTimeout());
                    	}
                    	else {
                    		sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
                    				message,
                    				rocketMQTemplate.getProducer().getSendMsgTimeout(),
                    				delayLevel);
                    	}
                    	log.debug("sync send to topic " + topicWithTags + " " + sendRes);
			} else {
                            Message<?> finalMessage = message;
                            SendCallback sendCallback = new SendCallback() {
						@Override
						public void onSuccess(SendResult sendResult) {
							log.debug("async send to topic " + topicWithTags + " "
									+ sendResult);
						}

						@Override
						public void onException(Throwable e) {
							log.error("RocketMQ Message hasn't been sent. Caused by "
									+ e.getMessage());
							if (getSendFailureChannel() != null) {
								getSendFailureChannel().send(
										RocketMQMessageHandler.this.errorMessageStrategy
												.buildErrorMessage(new MessagingException(
														finalMessage, e), null));
							}
						}
				}; //end sendCallback
     
				if (needSelectQueue) {
						rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
								message, "", sendCallback,
								rocketMQTemplate.getProducer().getSendMsgTimeout());
				} else {
						rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
								sendCallback);
				}
			}// end else
            }//end else
   
            // 如果没有发送成功
            if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
                if (getSendFailureChannel() != null) {
                    this.getSendFailureChannel().send(message);
                } else {
                    throw new MessagingException(message, new MQClientException("message hasn't been sent", null));
                } // end else
            } //end if
        } catch (Exception e) {
            log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
            if (getSendFailureChannel() != null) {
                    getSendFailureChannel().send(this.errorMessageStrategy.buildErrorMessage(new MessagingException(message, e), null));
            } else {
                    throw new MessagingException(message, e);
            }
        }// end catch

	}//end handleMessageInternal
}

(7).总结