(1). 概述

最近休息,对事务消息发件箱模式比较感兴趣,闲逛github时,发现有向我推一个新的框架,特意拿下来看下源码。

(2). Transaction Outbox优势

跨平台数据库的支持,比较轻量级,对于事务消息同步到其它存储设备(比如:MQ)的话,是需要自己去实现的。

(3). Transaction Outbox类图

Transaction Outbox类图

(4). Transaction Outbox官方案例

@RestController
class EventuallyConsistentController {

  private static final Logger LOGGER =
      LoggerFactory.getLogger(EventuallyConsistentController.class);

  @Autowired private CustomerRepository customerRepository;
  @Autowired private TransactionOutbox outbox;
  @Autowired private EventRepository eventRepository;
  @Autowired private EventPublisher eventPublisher;

  @SuppressWarnings("SameReturnValue")
  @RequestMapping("/createCustomer")
  @Transactional
  public String createCustomer() {
    LOGGER.info("Creating customers");
	// ***************************************************************************
	// schedule实际是对:EventPublisher类,进行AOP来着的
	// ***************************************************************************
    outbox
        .schedule(EventPublisher.class) // Just a trick to get autowiring to work.
        .publish(1L, "Created customers", LocalDateTime.now());
    customerRepository.save(new Customer(1L, "Martin", "Carthy"));
    customerRepository.save(new Customer(2L, "Dave", "Pegg"));
    LOGGER.info("Customers created");
    return "Done";
  }
} // end  EventuallyConsistentController



@Service
class EventPublisher {

  @Autowired 
  private EventRepository eventRepository;

  public void publish(long id, String description, LocalDateTime time) {
    eventRepository.save(new Event(id, description, time));
  }
} // end EventPublisher

(5). Transaction Outbox大体原理

1. 对业务对象:EventPublisher进行AOP代理。
2. 调用业务对象的方法:publish(1L, "Created customers", LocalDateTime.now()),实际被AOP进行了处理来着的
3. 把方面业务对象的方法,参数类型,参数,进行JSON序列化
4. 创建TransactionOutboxEntry对象,包裹着上面的JSON以及一些其它信息。
5. 调用:Persistor.save方法,把TransactionOutboxEntry进行持久化。
6. 为:Transaction对象,添加Hook方法,即:事务提交后触发事件。
7. Hook方法进行两件事情,其中之一为:回调:TransactionOutboxListener
8. 另一个则是:回调:Submitter

(6). Transaction Outbox部份源码摘要

private <T> T schedule(Class<T> clazz, String uniqueRequestId) {
    if (!initialized.get()) {
      throw new IllegalStateException("Not initialized");
    }
    return proxyFactory.createProxy(
        clazz,
        (method, args) ->
            uncheckedly(
                () -> {
				// **************************************************************
				// 1. 对方法和参数通过:TransactionalInvocation对象进行包裹
				// **************************************************************  
                  var extracted = transactionManager.extractTransaction(method, args);
				  
				  // **************************************************************
				  // 2. 创建:TransactionOutboxEntry对象
				  // **************************************************************  
                  TransactionOutboxEntry entry =
                      newEntry(
                          extracted.getClazz(),
                          extracted.getMethodName(),
                          extracted.getParameters(),
                          extracted.getArgs(),
                          uniqueRequestId);
                  validator.validate(entry);
				  
				  // **************************************************************  
				  // 3. 把TransactionOutboxEntry对象进行持久化(注意:与业务是在同一个事务之内来着的)
				  // **************************************************************  
                  persistor.save(extracted.getTransaction(), entry);
				  
                  extracted
                      .getTransaction()
                      .addPostCommitHook(
                          () -> {
							// **************************************************************  
							// 4. 回调:TransactionOutboxListener
							// **************************************************************  
                            listener.scheduled(entry);
							
							// **************************************************************
							// 5. 回调:Submitter,注意:当方法调用成功后,
							// 5.1 会剔除:TXNO_OUTBOX表中的数据来着的。
							// **************************************************************  
                            submitNow(entry);
                          });
                  log.debug(
                      "Scheduled {} for running after transaction commit", entry.description());
                  return null;
                }));
  }

TXNO_OUTBOX表结构

CREATE TABLE `TXNO_OUTBOX` (
  `id` varchar(36) NOT NULL,
  `invocation` mediumtext,
  `lastAttemptTime` timestamp(6) NULL DEFAULT NULL,
  `nextAttemptTime` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
  `attempts` int(11) DEFAULT NULL,
  `blocked` varchar(250) DEFAULT NULL,
  `version` int(11) DEFAULT NULL,
  `uniqueRequestId` varchar(250) DEFAULT NULL,
  `processed` tinyint(1) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniqueRequestId` (`uniqueRequestId`),
  KEY `IX_TXNO_OUTBOX_1` (`processed`,`blocked`,`nextAttemptTime`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;