FluxCreate与FluxArray一样implements SourceProducer,都是最底层的生产者(非中间层)。与FluxArray最明显的不同是,FluxCreate实现背压backpressure的功能(背压是响应式编程的一个亮点)。
调用方式
Flux.create(sink -> {
//向下游发布元素
for (int i = 0; i <= 100; i++) {
sink.next(i);
}
//结束发布元素
sink.complete();
})
.subscribe(System.out :: println);
源码方法
final class FluxCreate<T> extends Flux<T> implements SourceProducer<T> {
enum CreateMode {
PUSH_ONLY, PUSH_PULL
}
//这个source后面会被封装成sink(发射器),主要用来生产元素
final Consumer<? super FluxSink<T>> source;
//背压策略,本节先不详解
final OverflowStrategy backpressure;
//创建模式,本节先不详解
final CreateMode createMode;
FluxCreate(Consumer<? super FluxSink<T>> source,
FluxSink.OverflowStrategy backpressure,
CreateMode createMode) {
this.source = Objects.requireNonNull(source, "source");
this.backpressure = Objects.requireNonNull(backpressure, "backpressure");
this.createMode = createMode;
}
static <T> BaseSink<T> createSink(CoreSubscriber<? super T> t,
OverflowStrategy backpressure) {
switch (backpressure) {
case IGNORE: {
return new IgnoreSink<>(t);
}
case ERROR: {
return new ErrorAsyncSink<>(t);
}
case DROP: {
return new DropAsyncSink<>(t);
}
case LATEST: {
return new LatestAsyncSink<>(t);
}
default: {
return new BufferAsyncSink<>(t, Queues.SMALL_BUFFER_SIZE);
}
}
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
//创建发射器sink,默认使用BufferAsyncSink,发射器同时也扮演Subscription的角色
BaseSink<T> sink = createSink(actual, backpressure);
actual.onSubscribe(sink);
//FluxArray主要是通过最底层的LambdaSubscriber,在onSubscriber时,request(n),从而触发生产者向下游传递元素。与FluxArray等不同,这里主动触发发射器向下游传递元素。
try {
source.accept(
createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :
sink);
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
sink.error(Operators.onOperatorError(ex, actual.currentContext()));
}
}
}
FluxArray是通过ArraySubscription的request操作触发元素向下游传递并消费,而FluxCreate显然没有那么简单。
链路一: FluxCreate.subscribe(LambdaSubscriber) -> LambdaSubscriber.onSubscribe(BufferAsyncSink) -> BufferAsyncSink.request(n)
@Override
public final void request(long n) {
if (Operators.validate(n)) {
Operators.addCap(REQUESTED, this, n);
LongConsumer consumer = requestConsumer;
if (n > 0 && consumer != null && !isCancelled()) {
consumer.accept(n);
}
onRequestedFromDownstream();
}
}
void onRequestedFromDownstream() {
drain();
}
void drain() {
if (WIP.getAndIncrement(this) != 0) {
return;
}
final Subscriber<? super T> a = actual;
final Queue<T> q = queue;
for (; ; ) {
long r = requested;
long e = 0L;
while (e != r) {
if (isCancelled()) {
Operators.onDiscardQueueWithClear(q, ctx, null);
if (WIP.decrementAndGet(this) != 0) {
continue;
}
else {
return;
}
}
boolean d = done;
T o = q.poll();
boolean empty = o == null;
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.error(ex);
}
else {
super.complete();
}
return;
}
if (empty) {
break;
}
a.onNext(o);
e++;
}
if (e == r) {
if (isCancelled()) {
Operators.onDiscardQueueWithClear(q, ctx, null);
if (WIP.decrementAndGet(this) != 0) {
continue;
}
else {
return;
}
}
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.error(ex);
}
else {
super.complete();
}
return;
}
}
if (e != 0) {
Operators.produced(REQUESTED, this, e);
}
if (WIP.decrementAndGet(this) == 0) {
break;
}
}
}
核心逻辑是在 BufferAsyncSink#drain 中,drain主要是从一个队列中poll元素,向下游传递消费,此时尚未触发sink的next操作,所以队列中无元素,直接return. 继续向下走,source.accept( createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink);这里触发了元素的发射。
链路二: source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink);
这里实际上触发了测试类中的如下操作
sink -> {
//向下游发布元素
for (int i = 0; i <= 100; i++) {
sink.next(i);
}
//结束发布元素
sink.complete();
}
继续看下SerializedSink#next是如何向下游传递元素并消费的。
@Override
public FluxSink<T> next(T t) {
Objects.requireNonNull(t, "t is null in sink.next(t)");
if (sink.isTerminated() || done) {
Operators.onNextDropped(t, sink.currentContext());
return this;
}
if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
try {
//装饰模式 实质调用的 BufferAsyncSink#next
sink.next(t);
}
catch (Throwable ex) {
Operators.onOperatorError(sink, ex, t, sink.currentContext());
}
if (WIP.decrementAndGet(this) == 0) {
return this;
}
}
else {
this.mpscQueue.offer(t);
if (WIP.getAndIncrement(this) != 0) {
return this;
}
}
drainLoop();
return this;
}
BufferAsyncSink#next
@Override
public FluxSink<T> next(T t) {
queue.offer(t);
drain();
return this;
}
void drain() {
//触发从队列中取元素并向下传递消费
}
到这里,整个流程应该大体清晰了。相对于FluxArray, FluxCreate并未直接生产元素并向下游传递,而是先将将元素提交到队列,同时在next和request时适机触发从队列中poll元素向下游传递,因本例未使用异步线程,因此执行效果类似于FluxArray。 在增加线程调度后,配合背压策略,FluxCreate的优势将会凸显出来。线程调度将会后续介绍。 |