(1). 概述

在前面对ZeebeClient的构建过程,以及API有所了解了,这一小篇,对ZeebeClientFutureImpl的源码进行一个剖析,因为,该类,承载着Client与GRPC返回结果的中间件.

(2). ZeebeClientFutureImpl

public class ZeebeClientFutureImpl<ClientResponse, BrokerResponse>
     // ********************************************** 
	 // 1. 继承于:CompletableFuture,CompletableFuture是对Java多线程进行编排的工具.
	 //    该类在此处不进行详细介绍
	 // ********************************************** 
    extends CompletableFuture<ClientResponse>
	// ********************************************** 
	// 2. 实现了:ZeebeFuture
	// ********************************************** 
    implements ZeebeFuture<ClientResponse>,
	// ********************************************** 
	// 3. 实现了:io.grpc.stub.StreamObserver
	// ********************************************** 
	StreamObserver<BrokerResponse> {
    // ... ...
}		

(3). ZeebeFuture

public interface ZeebeFuture<T> 
       //  *******************************************
	   // JDK基础,不进行深入的讲解了
	   // 继承于:java.util.concurrent.Future
	   //  *******************************************
       extends Future<T>, CompletionStage<T> {
	   // 为什么感觉这两个方法,象极了:CompletableFuture.get()/get(long timeout, TimeUnit unit)
	T join();
	
	T join(long timeout, TimeUnit unit);
}	

(4). StreamObserver

// ************************************************
// GRPC的底层,也不进行深入了,但是这个接口的签名,代表了GRPC应该是一个Reactor(响应式流)的实现.
// ************************************************
public interface StreamObserver<V>  {
	
	void onNext(V value);
	
	void onError(Throwable t);
	
	void onCompleted();
}	

(5). ZeebeClientFutureImpl

public class ZeebeClientFutureImpl<ClientResponse, BrokerResponse>
    extends CompletableFuture<ClientResponse>
    implements ZeebeFuture<ClientResponse>, StreamObserver<BrokerResponse> {

   private final Function<BrokerResponse, ClientResponse> responseMapper;
   
     public ZeebeClientFutureImpl() {
       this(brokerResponse -> null);
     }
   
     public ZeebeClientFutureImpl(final Function<BrokerResponse, ClientResponse> responseMapper) {
       this.responseMapper = responseMapper;
     }
   
     @Override
     public ClientResponse join() {
       try {
		 // *************************************************
		 // 实际是调用:CompletableFuture.get,在这里的结果实际来于自GRPC StreamObserver.onNext
		 // *************************************************
         return get();
       } catch (final ExecutionException e) {
        // ... ...
     }
   
     @Override
     public ClientResponse join(final long timeout, final TimeUnit unit) {
       try {
         // *************************************************
		 // 实际是调用:CompletableFuture.get(final long timeout, final TimeUnit unit)
		 // 在这里的结果实际来于自GRPC StreamObserver.onNext
		 // *************************************************
         return get(timeout, unit);
       } catch (final ExecutionException e) {
        // ... ...
       }
     }
   
     // *****************************************************************
	 // 重点:
	 // 在这里实际调用了:CompletableFuture.complete(Object)
	 // 为CompletableFuture配置了返回的结果,让我们可以通过CompletableFuture.get()方法获得返回的结果
	 // Zeebe真的是坑人不浅,用了CompletableFuture的API,却仅仅只是做了一个手工的设置结果. 
	 // *****************************************************************
     @Override
     public void onNext(final BrokerResponse brokerResponse) {
       try {
         complete(responseMapper.apply(brokerResponse));
       } catch (final Exception e) {
         completeExceptionally(e);
       }
     }
   
     @Override
     public void onError(final Throwable throwable) {
       completeExceptionally(throwable);
     }
   
     private RuntimeException transformExecutionException(final ExecutionException e) {
       final Throwable cause = e.getCause();
   
       if (cause instanceof StatusRuntimeException) {
         final Status status = ((StatusRuntimeException) cause).getStatus();
         throw new ClientStatusException(status, e);
       } else {
         throw new ClientException(e);
       }
     } // end transformExecutionException

}

(6). 总结

在看ZeebeClientFutureImpl的源码时,看了接口的能力,会以为它会利用CompletableFuture进行线程的编排,结果,他并没有那么做,差点把自己给绕进去了.