Flux.just("1","2")
.subscribe(c->{
System.out.println("consumer:" + c);
});
public final Disposable subscribe(Consumer<? super T> consumer) {
// consumer = help.lixin.samples.Flux2Test$$Lambda$7
Objects.requireNonNull(consumer, "consumer");
return subscribe(consumer, null, null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {
// consumer 订阅
// errorConsumer 错误处理
// completeConsumer 完成处理
return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
// null
@Nullable Context initialContext) {
return subscribeWith(
// 创建:reactor.core.publisher.LambdaSubscriber
// 包课着正确消费者/错误消费者/完成处理等...
new LambdaSubscriber<>(consumer, errorConsumer,completeConsumer,null,initialContext)
);
}
interface CoreSubscriber<T> extends Subscriber<T> {}
interface InnerConsumer<I>
extends CoreSubscriber<I>,
Scannable {
}
final class LambdaSubscriber<T>
implements InnerConsumer<T>,
Disposable {
}
// LambdaSubscriber 属于:Subscriber的实现类
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
// reactor.core.publisher.LambdaSubscriber
// 调用最终的subscribe
subscribe(subscriber);
return subscriber;
}
public final void subscribe(Subscriber<? super T> actual) {
// actual = reactor.core.publisher.LambdaSubscriber
// ***************************************************
// 把FluxArray转换成:CorePublisher
// this = FluxArray
// ***************************************************
CorePublisher publisher = Operators.onLastAssembly(this);
// ***************************************************
// 把reactor.core.publisher.LambdaSubscriber
// 转换成: CoreSubscriber
// ***************************************************
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
// 从上面的分析得了:publisher不是OptimizableOperator的子类
if (publisher instanceof OptimizableOperator) { // false
OptimizableOperator operator = (OptimizableOperator) publisher;
while (true) {
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
return;
}
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
} //end if
operator = newSource;
} //end while
} //end if
// ***************************************************
// reactor.core.publisher.FluxArray
// .subscribe(reactor.core.publisher.LambdaSubscriber s)
// ***************************************************
publisher.subscribe(subscriber);
} catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
public void subscribe(CoreSubscriber<? super T> actual) {
// actual = reactor.core.publisher.LambdaSubscriber
// array = ["1","2"]
subscribe(actual, array);
}
public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
// s = reactor.core.publisher.LambdaSubscriber
// array = ["1","2"]
if (array.length == 0) { // false
Operators.complete(s);
return;
}// end if
if (s instanceof ConditionalSubscriber) { // false
s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array));
} else {
// *********************Subscriber.onSubscribe*************************
// reactor.core.publisher.LambdaSubscriber.onSubscribe
//
// **********************************************
s.onSubscribe(new ArraySubscription<>(s, array));
} //end else
}
// reactor.core.publisher.FluxArray$ArraySubscription
public final void onSubscribe(Subscription s) {
// s = reactor.core.publisher.FluxArray$ArraySubscription
// s包裹着:数组和LambdaSubscriber
if (Operators.validate(subscription, s)) { // true
// 订阅者
this.subscription = s;
if (subscriptionConsumer != null) { // false
try {
subscriptionConsumer.accept(s);
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
onError(t);
}
} else {
// s = FluxArray$ArraySubscription
s.request(Long.MAX_VALUE);
} //end else
} //end if
}
public void request(long n) {
// Long.MAX_VALUE
if (Operators.validate(n)) { // true
if (Operators.addCap(REQUESTED, this, n) == 0) { // true
if (n == Long.MAX_VALUE) { // true
// invoker fastPath
fastPath();
} else {
slowPath(n);
}
} //end if
} //end if
}
void fastPath() {
// a = [1, 2]
final T[] a = array;
// len = 2
final int len = a.length;
// s = reactor.core.publisher.LambdaSubscriber
final Subscriber<? super T> s = actual;
// 遍历数组
for (int i = index; i != len; i++) {
// 是否设置了取消
if (cancelled) {
return;
}
// 数组第N个元素
T t = a[i];
if (t == null) {
// 为空的情况下调用:onError
s.onError(new NullPointerException("The " + i + "th array element was null"));
return;
}
// ************************************
// onNext调用
// ************************************
// 最后调用onNext方法
s.onNext(t);
}// end for
if (cancelled) {
return;
} //end if
// 最后调用:onCompelete()方法
s.onComplete();
}