(1).Flux类关系图
// 注意该类是抽象类
public abstract class Flux<T>
// 实现了reactor.core.CorePublisher
// CorePublisher是Publisher的增加
implements CorePublisher<T> {
}
public interface CorePublisher<T>
// org.reactivestreams.Publisher
extends Publisher<T> {
}
(2).Flux.just(“1”,”2”)
public static <T> Flux<T> just(T... data) {
// ["1","2"]
return fromArray(data);
}
(3).Flux.fromArray
public static <T> Flux<T> fromArray(T[] array) {
// ["1","2"]
if (array.length == 0) { // false
return empty();
}
if (array.length == 1) { // false
return just(array[0]);
}
// 创建FluxArray包裹数组
// 调用onAssembly
return onAssembly(new FluxArray<>(array));
}
(4).FluxArray
FluxArray既是Flux的子类,而且还实现了:Publisher
final class FluxArray<T>
// 继承于:reactor.core.publisher.Flux
extends Flux<T>
// reactor.core.Fuseable
implements Fuseable,
// reactor.core.publisher.SourceProducer
SourceProducer<T> {
}
interface SourceProducer<O>
// reactor.core.Scannable
extends Scannable,
// org.reactivestreams.Publisher
Publisher<O> {
}
(5). Flux.onAssembly
// onAssembly 包裹结果集,可以Hooks可以TRACE整个过程
protected static <T> Flux<T> onAssembly(Flux<T> source) {
// source = FluxArray
Function<Publisher, Publisher> hook = Hooks.onEachOperatorHook;
if(hook != null) { // false
source = (Flux<T>) hook.apply(source);
}
if (Hooks.GLOBAL_TRACE) {
AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
source = (Flux<T>) Hooks.addAssemblyInfo(source, stacktrace);
}
return source;
}
(6).总结
- Flux.just会把数组包裹成:reactor.core.publisher.FluxArray
- 返回:reactor.core.publisher.FluxArray,因为FluxArray继承于:Flux并实现了:Publisher