(1). 背景
在某些业务场景下,我们会用到消息中间件,那么,在用消息中间件在发送消息时,如何保证100%的不丢消息呢,那就需要事务消息了.
最近在研究Eventuate,最初没打算用这套框架,但是,想了想吧!还是少自研一些,尽量向别人的开源框架靠拢,如果不满足自己做扩展,这样,有好处,也有坏处,坏处就是我需要花大量的时间看完源码.
(2). 项目结构
lixin-macbook:event-example lixin$ tree
.
├── event-api-message # 生产者\消费者共用的业务模型和事件
│ ├── pom.xml
│ └── src
│ └── main
│ ├── java
│ │ └── help
│ │ └── lixin
│ │ └── domain
│ │ ├── Account.java
│ │ └── AccountDebited.java
│ └── resources
├── event-consumer # 消费者模块
│ ├── pom.xml
│ └── src
│ └── main
│ ├── java
│ │ └── help
│ │ └── lixin
│ │ └── consumer
│ │ ├── EventConsumerApp.java
│ │ ├── config
│ │ │ └── EventConsumerConfig.java
│ │ └── service
│ │ ├── AccountEventConsumer.java
│ │ ├── AggregateSupplier.java
│ │ └── IdSupplier.java
│ └── resources
│ └── application.yaml
├── event-producer # 生产者模块
│ ├── pom.xml
│ └── src
│ └── main
│ ├── java
│ │ └── help
│ │ └── lixin
│ │ └── producer
│ │ ├── EventProducerApp.java
│ │ ├── config
│ │ │ └── ProducerConfig.java
│ │ └── controller
│ │ └── PublisherController.java
│ └── resources
│ └── application.yaml
├── pom.xml
└── sql # sql脚本
└── eventuate.sql
(3). eventuate.sql
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cdc_monitoring
-- ----------------------------
DROP TABLE IF EXISTS `cdc_monitoring`;
CREATE TABLE `cdc_monitoring` (
`reader_id` varchar(255) NOT NULL,
`last_time` bigint DEFAULT NULL,
PRIMARY KEY (`reader_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for entities
-- ----------------------------
DROP TABLE IF EXISTS `entities`;
CREATE TABLE `entities` (
`entity_type` varchar(255) NOT NULL,
`entity_id` varchar(255) NOT NULL,
`entity_version` longtext NOT NULL,
PRIMARY KEY (`entity_type`,`entity_id`),
KEY `entities_idx` (`entity_type`,`entity_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for events
-- ----------------------------
DROP TABLE IF EXISTS `events`;
CREATE TABLE `events` (
`event_id` varchar(255) NOT NULL,
`event_type` longtext,
`event_data` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
`entity_type` varchar(255) NOT NULL,
`entity_id` varchar(255) NOT NULL,
`triggering_event` longtext,
`metadata` longtext,
`published` tinyint DEFAULT '0',
PRIMARY KEY (`event_id`),
KEY `events_idx` (`entity_type`,`entity_id`,`event_id`),
KEY `events_published_idx` (`published`,`event_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for message
-- ----------------------------
DROP TABLE IF EXISTS `message`;
CREATE TABLE `message` (
`id` varchar(255) NOT NULL,
`destination` longtext NOT NULL,
`headers` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
`payload` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
`published` smallint DEFAULT '0',
`message_partition` smallint DEFAULT NULL,
`creation_time` bigint DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `message_published_idx` (`published`,`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for offset_store
-- ----------------------------
DROP TABLE IF EXISTS `offset_store`;
CREATE TABLE `offset_store` (
`client_name` varchar(255) NOT NULL,
`serialized_offset` longtext,
PRIMARY KEY (`client_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for received_messages
-- ----------------------------
DROP TABLE IF EXISTS `received_messages`;
CREATE TABLE `received_messages` (
`consumer_id` varchar(255) NOT NULL,
`message_id` varchar(255) NOT NULL,
`creation_time` bigint DEFAULT NULL,
`published` smallint DEFAULT '0',
PRIMARY KEY (`consumer_id`,`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for saga_instance
-- ----------------------------
DROP TABLE IF EXISTS `saga_instance`;
CREATE TABLE `saga_instance` (
`saga_type` varchar(255) NOT NULL,
`saga_id` varchar(100) NOT NULL,
`state_name` varchar(100) NOT NULL,
`last_request_id` varchar(100) DEFAULT NULL,
`end_state` int DEFAULT NULL,
`compensating` int DEFAULT NULL,
`failed` int DEFAULT NULL,
`saga_data_type` varchar(1000) NOT NULL,
`saga_data_json` varchar(1000) NOT NULL,
PRIMARY KEY (`saga_type`,`saga_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for saga_instance_participants
-- ----------------------------
DROP TABLE IF EXISTS `saga_instance_participants`;
CREATE TABLE `saga_instance_participants` (
`saga_type` varchar(255) NOT NULL,
`saga_id` varchar(100) NOT NULL,
`destination` varchar(100) NOT NULL,
`resource` varchar(100) NOT NULL,
PRIMARY KEY (`saga_type`,`saga_id`,`destination`,`resource`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for saga_lock_table
-- ----------------------------
DROP TABLE IF EXISTS `saga_lock_table`;
CREATE TABLE `saga_lock_table` (
`target` varchar(100) NOT NULL,
`saga_type` varchar(255) NOT NULL,
`saga_Id` varchar(100) NOT NULL,
PRIMARY KEY (`target`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for saga_stash_table
-- ----------------------------
DROP TABLE IF EXISTS `saga_stash_table`;
CREATE TABLE `saga_stash_table` (
`message_id` varchar(100) NOT NULL,
`target` varchar(100) NOT NULL,
`saga_type` varchar(255) NOT NULL,
`saga_id` varchar(100) NOT NULL,
`message_headers` varchar(1000) NOT NULL,
`message_payload` varchar(1000) NOT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- ----------------------------
-- Table structure for snapshots
-- ----------------------------
DROP TABLE IF EXISTS `snapshots`;
CREATE TABLE `snapshots` (
`entity_type` varchar(255) NOT NULL,
`entity_id` varchar(255) NOT NULL,
`entity_version` varchar(255) NOT NULL,
`snapshot_type` longtext NOT NULL,
`snapshot_json` longtext NOT NULL,
`triggering_events` longtext,
PRIMARY KEY (`entity_type`,`entity_id`,`entity_version`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
SET FOREIGN_KEY_CHECKS = 1;
(4). 定义业务模型
package help.lixin.domain;
public class Account {
}
(5). DomainEvent
package help.lixin.domain;
import io.eventuate.tram.events.common.DomainEvent;
public class AccountDebited implements DomainEvent {
private long amount;
public AccountDebited() {
}
public AccountDebited(long amount) {
this.amount = amount;
}
public long getAmount() {
return amount;
}
public void setAmount(long amount) {
this.amount = amount;
}
}
(6). 为业务模型工程添加依赖
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-events</artifactId>
<version>${eventuate-tram.version}</version>
<scope>provided</scope>
</dependency>
(7). 定义生产者配置
package help.lixin.producer.config;
import io.eventuate.tram.spring.events.publisher.TramEventsPublisherConfiguration;
import io.eventuate.tram.spring.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
@Import({TramEventsPublisherConfiguration.class,
//
TramMessageProducerJdbcConfiguration.class})
public class ProducerConfig {
}
(8). 定义生产者发布事件
package help.lixin.producer.controller;
import help.lixin.domain.Account;
import help.lixin.domain.AccountDebited;
import io.eventuate.tram.events.common.DomainEvent;
import io.eventuate.tram.events.publisher.DomainEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collections;
@RestController
public class PublisherController {
@Autowired
private DomainEventPublisher domainEventPublisher;
@GetMapping("/publish")
public String publish() {
String aggregateType = Account.class.getName();
Long aggregateId = System.currentTimeMillis();
Long uniqueId = aggregateId;
DomainEvent domainEvent = new AccountDebited(uniqueId);
// 发布事件.
domainEventPublisher.publish(
//
aggregateType,
//
uniqueId,
//
Collections.singletonList(domainEvent));
return "SUCCESS";
}
}
(9). 生产者配置文件
注意:生产者是强依赖于db来着的
server:
port: 9091
spring:
datasource:
url: jdbc:mysql://127.0.0.1:3306/eventuate
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
(10). 生产者依赖配置
<dependency>
<groupId>help.lixin</groupId>
<artifactId>event-api-message</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-events</artifactId>
<version>${eventuate-tram.version}</version>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-producer-jdbc</artifactId>
<version>${eventuate-tram.version}</version>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-events-publisher</artifactId>
<version>${eventuate-tram.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
(11). 定义消费者配置类
package help.lixin.consumer.config;
import help.lixin.consumer.service.AccountEventConsumer;
import help.lixin.domain.Account;
import io.eventuate.tram.consumer.common.DuplicateMessageDetector;
import io.eventuate.tram.consumer.common.NoopDuplicateMessageDetector;
import io.eventuate.tram.events.subscriber.DomainEventDispatcher;
import io.eventuate.tram.events.subscriber.DomainEventDispatcherFactory;
import io.eventuate.tram.spring.consumer.kafka.EventuateTramKafkaMessageConsumerConfiguration;
import io.eventuate.tram.spring.events.subscriber.TramEventSubscriberConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
@Import({TramEventSubscriberConfiguration.class,
EventuateTramKafkaMessageConsumerConfiguration.class})
public class EventConsumerConfig {
@Bean
public DuplicateMessageDetector noopDuplicateMessageDetector() {
return new NoopDuplicateMessageDetector();
}
@Bean
public AccountEventConsumer accountEventConsumer() {
return new AccountEventConsumer(Account.class.getName());
}
@Bean
public DomainEventDispatcher domainEventDispatcher(
// 定义事件分发工厂,实际内部是持有一个MessageConsumer
DomainEventDispatcherFactory domainEventDispatcherFactory,
// 引用上面的:AccountEventConsumer
AccountEventConsumer target) {
return domainEventDispatcherFactory.make("eventDispatcherId", target.domainEventHandlers());
} // end
}
(12). 定义消费者类
package help.lixin.consumer.service;
import help.lixin.domain.AccountDebited;
import io.eventuate.tram.events.subscriber.DomainEventEnvelope;
import io.eventuate.tram.events.subscriber.DomainEventHandlers;
import io.eventuate.tram.events.subscriber.DomainEventHandlersBuilder;
public class AccountEventConsumer {
private final String aggregateType;
public AccountEventConsumer(String aggregateType) {
this.aggregateType = aggregateType;
}
public DomainEventHandlers domainEventHandlers() {
return DomainEventHandlersBuilder
// 聚合根对象:Account
.forAggregateType(aggregateType)
// 事件处理
.onEvent(AccountDebited.class, this::handleAccountDebited)
//
.build();
}
public void handleAccountDebited(DomainEventEnvelope<AccountDebited> event) {
System.out.println(event);
}
}
(13). 定义消费者配置类
server:
port: 9092
eventuatelocal:
kafka:
bootstrap:
servers: 127.0.0.1:9092
(14). 定义消费者依赖
<dependency>
<groupId>help.lixin</groupId>
<artifactId>event-api-message</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-events</artifactId>
<version>${eventuate-tram.version}</version>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-events-subscriber</artifactId>
<version>${eventuate-tram.version}</version>
</dependency>
<dependency>
<groupId>io.eventuate.tram.core</groupId>
<artifactId>eventuate-tram-spring-consumer-kafka</artifactId>
<version>${eventuate-tram.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
(15). 配置启动:eventuate-cdc-service
#!/bin/bash
export SPRING_DATASOURCE_DRIVER_CLASS_NAME=com.mysql.jdbc.Driver
export SPRING_DATASOURCE_URL=jdbc:mysql://localhost:3306/eventuate
export SPRING_DATASOURCE_USERNAME=root
export SPRING_DATASOURCE_PASSWORD=123456
export EVENTUATELOCAL_CDC_DB_USER_NAME=root
export EVENTUATELOCAL_CDC_DB_PASSWORD=123456
export JAVA_OPTS=-Xmx64m
export EVENTUATELOCAL_CDC_OFFSET_STORE_KEY=MySqlBinlog
export EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC=false
# EVENTUATE_CDC_OUTBOX_PARTITIONING_OUTBOX_TABLES=
export EVENTUATELOCAL_CDC_READER_NAME=MySqlReader
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-18.0.1.1.jdk/Contents/Home
export EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_UNIQUE_ID=1234567890
export EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING=localhost:2181
export EVENTUATE_OUTBOX_ID=1
# 需要自己去拉取源码下来,然后,编译.
# https://github.com/eventuate-foundation/eventuate-cdc
java -jar ./eventuate-cdc-service-0.15.0-SNAPSHOT.jar -Xmx256m
// 相关配置,详见: EventuateConfigurationProperties
management.endpoint.health.show-details=ALWAYS
management.endpoints.web.exposure.include=prometheus,health
logging.level.root=INFO
spring.datasource.test.on.borrow=true
spring.datasource.validation.query=SELECT 1
spring.datasource.driver.class.name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/eventuate
spring.datasource.username=root
spring.datasource.password=123456
eventuatelocal.cdc.db.user.name=root
eventuatelocal.cdc.db.password=123456
eventuatelocal.cdc.offset.store.key=MySqlBinlog
eventuatelocal.cdc.read.old.debezium.db.offset.storage.topic=false
eventuatelocal.cdc.reader.name=MySqlReader
eventuatelocal.cdc.mysql.binlog.client.unique.id=1234567890
eventuatelocal.kafka.bootstrap.servers=localhost:9092
eventuatelocal.zookeeper.connection.string=localhost:2181
eventuate.outbox.id=1
(16). 启动项目
1. 启动mysql(注意:mysql要配置开启binlong)
2. 启动zookeeper
3. 启动kafka
4. 启动eventuate-cdc-service
5. 启动producer
6. 启动consumer
(17). 测试
lixin-macbook:~ lixin$ curl http://localhost:9091/publish
SUCCESS
(18). 验证
验证消费者端是否有消息消费成功.