Reactor3 源码解析四: FluxCreate源码剖析

论坛 期权论坛 编程之家     
选择匿名的用户   2021-5-21 16:20   11   0

FluxCreate与FluxArray一样implements SourceProducer,都是最底层的生产者(非中间层)。与FluxArray最明显的不同是,FluxCreate实现背压backpressure的功能(背压是响应式编程的一个亮点)。

调用方式

    Flux.create(sink -> {
             //向下游发布元素
      for (int i = 0; i <= 100; i++) {
      sink.next(i);
      }
            //结束发布元素
            sink.complete();
         })
    .subscribe(System.out :: println);

源码方法

final class FluxCreate<T> extends Flux<T> implements SourceProducer<T> {

 enum CreateMode {
  PUSH_ONLY, PUSH_PULL
 }
    
    //这个source后面会被封装成sink(发射器),主要用来生产元素
 final Consumer<? super FluxSink<T>> source;

    //背压策略,本节先不详解
 final OverflowStrategy backpressure;

    //创建模式,本节先不详解
 final CreateMode createMode;

 FluxCreate(Consumer<? super FluxSink<T>> source,
   FluxSink.OverflowStrategy backpressure,
   CreateMode createMode) {
  this.source = Objects.requireNonNull(source, "source");
  this.backpressure = Objects.requireNonNull(backpressure, "backpressure");
  this.createMode = createMode;
 }

 static <T> BaseSink<T> createSink(CoreSubscriber<? super T> t,
   OverflowStrategy backpressure) {
  switch (backpressure) {
   case IGNORE: {
    return new IgnoreSink<>(t);
   }
   case ERROR: {
    return new ErrorAsyncSink<>(t);
   }
   case DROP: {
    return new DropAsyncSink<>(t);
   }
   case LATEST: {
    return new LatestAsyncSink<>(t);
   }
   default: {
    return new BufferAsyncSink<>(t, Queues.SMALL_BUFFER_SIZE);
   }
  }
 }

 @Override
 public void subscribe(CoreSubscriber<? super T> actual) {
        //创建发射器sink,默认使用BufferAsyncSink,发射器同时也扮演Subscription的角色
  BaseSink<T> sink = createSink(actual, backpressure);

  actual.onSubscribe(sink);
        //FluxArray主要是通过最底层的LambdaSubscriber,在onSubscriber时,request(n),从而触发生产者向下游传递元素。与FluxArray等不同,这里主动触发发射器向下游传递元素。
  try {
   source.accept(
     createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :
       sink);
  }
  catch (Throwable ex) {
   Exceptions.throwIfFatal(ex);
   sink.error(Operators.onOperatorError(ex, actual.currentContext()));
  }
 }
}

FluxArray是通过ArraySubscription的request操作触发元素向下游传递并消费,而FluxCreate显然没有那么简单。

链路一: FluxCreate.subscribe(LambdaSubscriber) -> LambdaSubscriber.onSubscribe(BufferAsyncSink) -> BufferAsyncSink.request(n)

  @Override
  public final void request(long n) {
   if (Operators.validate(n)) {
    Operators.addCap(REQUESTED, this, n);

    LongConsumer consumer = requestConsumer;
    if (n > 0 && consumer != null && !isCancelled()) {
     consumer.accept(n);
    }
    onRequestedFromDownstream();
   }
  }

  void onRequestedFromDownstream() {
   drain();
  }

        void drain() {
   if (WIP.getAndIncrement(this) != 0) {
    return;
   }

   final Subscriber<? super T> a = actual;
   final Queue<T> q = queue;

   for (; ; ) {
    long r = requested;
    long e = 0L;

    while (e != r) {
     if (isCancelled()) {
      Operators.onDiscardQueueWithClear(q, ctx, null);
      if (WIP.decrementAndGet(this) != 0) {
       continue;
      }
      else {
       return;
      }
     }

     boolean d = done;

     T o = q.poll();

     boolean empty = o == null;

     if (d && empty) {
      Throwable ex = error;
      if (ex != null) {
       super.error(ex);
      }
      else {
       super.complete();
      }
      return;
     }

     if (empty) {
      break;
     }

     a.onNext(o);

     e++;
    }

    if (e == r) {
     if (isCancelled()) {
      Operators.onDiscardQueueWithClear(q, ctx, null);
      if (WIP.decrementAndGet(this) != 0) {
       continue;
      }
      else {
       return;
      }
     }

     boolean d = done;

     boolean empty = q.isEmpty();

     if (d && empty) {
      Throwable ex = error;
      if (ex != null) {
       super.error(ex);
      }
      else {
       super.complete();
      }
      return;
     }
    }

    if (e != 0) {
     Operators.produced(REQUESTED, this, e);
    }

    if (WIP.decrementAndGet(this) == 0) {
     break;
    }
   }
  }

核心逻辑是在 BufferAsyncSink#drain 中,drain主要是从一个队列中poll元素,向下游传递消费,此时尚未触发sink的next操作,所以队列中无元素,直接return. 继续向下走,source.accept(
createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink);这里触发了元素的发射。

链路二: source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink);

这里实际上触发了测试类中的如下操作

            sink -> {
             //向下游发布元素
      for (int i = 0; i <= 100; i++) {
      sink.next(i);
      }
            //结束发布元素
            sink.complete();
         }

继续看下SerializedSink#next是如何向下游传递元素并消费的。

        @Override
  public FluxSink<T> next(T t) {
   Objects.requireNonNull(t, "t is null in sink.next(t)");
   if (sink.isTerminated() || done) {
    Operators.onNextDropped(t, sink.currentContext());
    return this;
   }
   if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
    try {
                    //装饰模式 实质调用的  BufferAsyncSink#next
     sink.next(t);
    }
    catch (Throwable ex) {
     Operators.onOperatorError(sink, ex, t, sink.currentContext());
    }
    if (WIP.decrementAndGet(this) == 0) {
     return this;
    }
   }
   else {
    this.mpscQueue.offer(t);
    if (WIP.getAndIncrement(this) != 0) {
     return this;
    }
   }
   drainLoop();
   return this;
  }

BufferAsyncSink#next

  @Override
  public FluxSink<T> next(T t) {
   queue.offer(t);
   drain();
   return this;
  }

        void drain() {
            //触发从队列中取元素并向下传递消费
        }

到这里,整个流程应该大体清晰了。相对于FluxArray, FluxCreate并未直接生产元素并向下游传递,而是先将将元素提交到队列,同时在next和request时适机触发从队列中poll元素向下游传递,因本例未使用异步线程,因此执行效果类似于FluxArray。 在增加线程调度后,配合背压策略,FluxCreate的优势将会凸显出来。线程调度将会后续介绍。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP