(1). 前言

业务知识请参考该文章“ServiceComb中的数据最终一致性方案 - part 1”

(2). 租车服务(car)

// 1. 租车实体对象
package help.lixin.saga.example;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonIgnore;

@JsonAutoDetect(fieldVisibility = Visibility.ANY)
class CarBooking {
  private Integer id;
  private String name;
  private Integer amount;
  private boolean confirmed;
  private boolean cancelled;

  Integer getId() {
    return id;
  }

  void setId(Integer id) {
    this.id = id;
  }

  String getName() {
    return name;
  }

  void setName(String name) {
    this.name = name;
  }

  Integer getAmount() {
    return amount;
  }

  void setAmount(Integer amount) {
    this.amount = amount;
  }

  boolean isConfirmed() {
    return confirmed;
  }

  void confirm() {
    this.confirmed = true;
    this.cancelled = false;
  }

  boolean isCancelled() {
    return cancelled;
  }

  void cancel() {
    this.confirmed = false;
    this.cancelled = true;
  }
}


// 2. CarBookingController
package help.lixin.saga.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@RestController
public class CarBookingController {
  @Autowired
  private CarBookingService service;

  private final AtomicInteger id = new AtomicInteger(0);

  @CrossOrigin
  @GetMapping("/bookings") List<CarBooking> getAll() {
    return new ArrayList<>(service.getAllBookings());
  }

  @PostMapping("/order/{name}/{cars}")
  CarBooking order(@PathVariable String name, @PathVariable Integer cars) {
    CarBooking booking = new CarBooking();
    booking.setId(id.incrementAndGet());
    booking.setName(name);
    booking.setAmount(cars);
    service.order(booking);
    return booking;
  }

  @DeleteMapping("/bookings")
  void clear() {
    service.clearAllBookings();
    id.set(0);
  }
}


// 3. CarBookingService
package help.lixin.saga.example;

import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
import org.springframework.stereotype.Service;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
class CarBookingService {
	private Map<Integer, CarBooking> bookings = new ConcurrentHashMap<>();

    // *************************************************************************
	// 分支事务,需要使用注解@Compensable,并指定补偿方法.
	// 注意:canal的方法签名要与参与事务的方法签名一样的.
	// *************************************************************************
	@Compensable(compensationMethod = "cancel")
	void order(CarBooking booking) {
		// 订车的数量大于10的情况下,整个分布式事务要求rollback
		if (booking.getAmount() > 10) {
			throw new IllegalArgumentException("can not order the cars large than ten");
		}
		booking.confirm();
		bookings.put(booking.getId(), booking);
	}

	void cancel(CarBooking booking) {
		Integer id = booking.getId();
		if (bookings.containsKey(id)) {
			bookings.get(id).cancel();
		}
		// Just sleep a while to ensure the Compensated event is after ordering TxAbort
		// event
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// Just ignore the exception
		}
	}

	Collection<CarBooking> getAllBookings() {
		return bookings.values();
	}

	void clearAllBookings() {
		bookings.clear();
	}
}


// 4. application.properties
spring.application.name=car
server.port=8082
alpha.cluster.address=localhost:8080

(3). 酒店服务(hotel)

// 1. HotelBooking实体
package help.lixin.saga.example;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonIgnore;

@JsonAutoDetect(fieldVisibility = Visibility.ANY)
class HotelBooking {
  private Integer id;
  private String name;
  private Integer amount;
  private boolean confirmed;
  private boolean cancelled;

  Integer getId() {
    return id;
  }

  void setId(Integer id) {
    this.id = id;
  }

  String getName() {
    return name;
  }

  void setName(String name) {
    this.name = name;
  }

  Integer getAmount() {
    return amount;
  }

  void setAmount(Integer amount) {
    this.amount = amount;
  }

  boolean isConfirmed() {
    return confirmed;
  }

  void confirm() {
    this.confirmed = true;
    this.cancelled = false;
  }

  boolean isCancelled() {
    return cancelled;
  }

  void cancel() {
    this.confirmed = false;
    this.cancelled = true;
  }
}


// 2. HotelBookingController
package help.lixin.saga.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@RestController
public class HotelBookingController {
  @Autowired
  private HotelBookingService service;

  private final AtomicInteger id = new AtomicInteger(0);

  @CrossOrigin
  @GetMapping("/bookings")
  List<HotelBooking> getAll() {
    return new ArrayList<>(service.getAllBookings());
  }

  @PostMapping("/order/{name}/{rooms}")
  HotelBooking order(@PathVariable String name, @PathVariable Integer rooms) {
    HotelBooking booking = new HotelBooking();
    booking.setId(id.incrementAndGet());
    booking.setName(name);
    booking.setAmount(rooms);
    service.order(booking);
    return booking;
  }

  @DeleteMapping("/bookings")
  void clear() {
    service.clearAllBookings();
    id.set(0);
  }
}


// 3. HotelBookingService
package help.lixin.saga.example;

import org.apache.servicecomb.pack.omega.transaction.annotations.Compensable;
import org.springframework.stereotype.Service;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
class HotelBookingService {
  private Map<Integer, HotelBooking> bookings = new ConcurrentHashMap<>();

  // *************************************************************************
  // 分支事务,需要使用注解@Compensable,并指定补偿方法.
  // 注意:canal的方法签名要与参与事务的方法签名一样的.
  // *************************************************************************
  @Compensable(compensationMethod = "cancel")
  void order(HotelBooking booking) {
	 // 当订酒店的数量大于2的时候,整个分布式事务会rollback
    if (booking.getAmount() > 2) {
      throw new IllegalArgumentException("can not order the rooms large than two");
    }
    booking.confirm();
    bookings.put(booking.getId(), booking);
  }

  void cancel(HotelBooking booking) {
    Integer id = booking.getId();
    if (bookings.containsKey(id)) {
      bookings.get(id).cancel();
    }
  }

  Collection<HotelBooking> getAllBookings() {
    return bookings.values();
  }

  void clearAllBookings() {
    bookings.clear();
  }
}


// 4. application.properties
spring.application.name=hotel
server.port=8081
alpha.cluster.address=localhost:8080

(4). 预订服务(booking)

// 1. BookingController
package help.lixin.saga.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.servicecomb.pack.omega.context.annotations.SagaStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

@RestController
public class BookingController {

  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @Value("${car.service.address:http://car.servicecomb.io:8080}")
  private String carServiceUrl;

  @Value("${hotel.service.address:http://hotel.servicecomb.io:8080}")
  private String hotelServiceUrl;

  @Autowired
  private RestTemplate template;

  // ***************************************************************
  // 定义全局事务@SagaStart
  // ***************************************************************
  @SagaStart
  @PostMapping("/booking/{name}/{rooms}/{cars}")
  public String order( // 用户信息
		              @PathVariable String name, 
		              // 预计包房数量
		              @PathVariable Integer rooms,
		              // 租车数量
                      @PathVariable Integer cars) throws Throwable {

    if (cars < 0) {
      throw new Exception("The cars order quantity must be greater than 0");
    }

    // 1. 租车
    template.postForEntity(
        carServiceUrl + "/order/{name}/{cars}",
        null, String.class, name, cars);

    postCarBooking();

    if (rooms < 0) {
      throw new Exception("The rooms order quantity must be greater than 0");
    }

    // 2. 订酒店
    template.postForEntity(
        hotelServiceUrl + "/order/{name}/{rooms}",
        null, String.class, name, rooms);

    postBooking();

    return name + " booking " + rooms + " rooms and " + cars + " cars OK";
  }

  // This method is used by the byteman to inject exception here
  private void postCarBooking() throws Throwable {

  }

  // This method is used by the byteman to inject the faults such as the timeout or the crash
  private void postBooking() throws Throwable {

  }

  // This method is used by the byteman trigger shutdown the master node in the Alpha server cluster
  private void alphaMasterShutdown() {
    String alphaRestAddress = System.getenv("alpha.rest.address");
    LOG.info("alpha.rest.address={}", alphaRestAddress);
    List<String> addresss = Arrays.asList(alphaRestAddress.split(","));

    addresss.stream().filter(address -> {
      // use the actuator alpha endpoint to find the alpha master node
      try {
        ResponseEntity<String> responseEntity = template
            .getForEntity(address + "/actuator/alpha", String.class);
        ObjectMapper mapper = new ObjectMapper();
        if (responseEntity.getStatusCode() == HttpStatus.OK) {
          String json = responseEntity.getBody();
          Map<String, String> map = mapper.readValue(json, Map.class);
          if (map.get("nodeType").equalsIgnoreCase("MASTER")) {
            return true;
          }
        }
      } catch (Exception ex) {
        LOG.error("", ex);
      }
      return false;
    }).forEach(address -> {
      // call shutdown endpoint to shutdown the alpha master node
      HttpHeaders headers = new HttpHeaders();
      headers.setContentType(MediaType.APPLICATION_JSON);
      HttpEntity request = new HttpEntity(headers);
      ResponseEntity<String> responseEntity = template
          .postForEntity(address + "/actuator/shutdown", request, String.class);
      if (responseEntity.getStatusCode() == HttpStatus.OK) {
        LOG.info("Alpah master node {} shutdown", address);
      }
    });
  }
}


// 2. application.properties
spring.application.name=booking
server.port=8083
alpha.cluster.address=localhost:8080
hotel.service.address=http://localhost:8081
car.service.address=http://localhost:8082

(5). pom.xml

// 1. parent(pom.xml)定义公共的依赖
<properties>
	<maven.compiler.source>8</maven.compiler.source>
	<maven.compiler.target>8</maven.compiler.target>
	<servicecomb.version>0.6.0</servicecomb.version>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<modules>
	<module>car</module>
	<module>hotel</module>
	<module>booking</module>
</modules>

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-dependencies</artifactId>
			<version>Greenwich.RELEASE</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-dependencies</artifactId>
			<version>2.1.0.RELEASE</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-netflix</artifactId>
			<version>2.1.1.RELEASE</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.servicecomb.pack</groupId>
			<artifactId>omega-spring-starter</artifactId>
			<version>${servicecomb.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.servicecomb.pack</groupId>
			<artifactId>omega-transport-resttemplate</artifactId>
			<version>${servicecomb.version}</version>
		</dependency>
	</dependencies>
</dependencyManagement>

// car/hotel/booking 定义依赖.
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.apache.servicecomb.pack</groupId>
		<artifactId>omega-spring-starter</artifactId>
	</dependency>
	<dependency>
		<groupId>org.apache.servicecomb.pack</groupId>
		<artifactId>omega-transport-resttemplate</artifactId>
	</dependency>
</dependencies>

(6). 总结

  1. 在要开启分布式事务的最外层,只要加入一个注解(@SagaStart)即可.
  2. 在分支事务里,只需加入一个注解(@Compensable(compensationMethod = “xxx”)),并配置补偿的方法名称(方法签名要事务的方法签名保持一致).