这篇文章来源于我最近在接触响应式编程,了解到reactive streams的规范。发现里面的好多术语,或者概念我完全无法理解。但是总是在不经意间的灵光乍现,想到一些比较奇怪的想法,我怕这种感觉会消失,所以记录下来那一瞬间我的真实想法。也就是此时此刻,我对响应式的一些认知。或许随着时间的推移,这种感觉会被推翻,但是我想记录下我的成长历程。

一、什么是“调度”(Scheduling)?

在接触了project Reactor之后呢 我总是听到类似这样的内容:开发者不需要自己管理线程,reactor会有调度器帮我们自动管理。但是我是理解不上去的,任务在哪个线程执行,什么时候执行,啥叫调度啊。一时间很多概念闯进来。

在异步编程中,调度 指的是:

比如我们在webflux中总是写下面的这段代码:

Mono.just("Hello") .map(s -> s + " World") .subscribe(System.out::println);
  • .map()中的代码 和 System.out::println 是你要执行的逻辑。
  • 但谁来执行它们?在哪个线程上执行?
  • 这就是 调度 要解决的问题。

二、Reactor 的“调度器”(Scheduler)是什么?

Scheduler 是 Reactor 提供的一个 抽象接口,用来:

你可以把它理解为 Java 中 ExecutorService 的响应式版本

我去源码中截了个图片:看到了Scheduler中的schedule方法签名,就是接收个Runable的任务,将来我去调度到某个线程中去执行,但是具体实现交给实现类了。这是顶级抽象。 image.png

subcribeOn 和 publishOn

所以我们去把任务去调度的时候,总是调用类似下面的代码。

Mono.just("data")
    .map(a -> processInThread1(a))          // 阶段1
    .publishOn(Schedulers.parallel())       // 切换点
    .map(b -> processInThread2(b))          // 阶段2
    .subscribe(System.out::println);        // 阶段3

publishOn(Schedulers.parallel()): 通过名称我们来分析这个方法的意思就好像是说 把数据发布到Scheduler指定的线程。

我们来说说publishOn。首先我应该能清楚的是,每一个算子都是一个publisher,因为它是链式调用的嘛,所以数据经过一个算子之后,一定要往下流。并且作为中间的算子,它本身是没有数据的,数据来源于上游。所以每个算子的内部要封装了一个Subscriber.因为在reactive streams规范中,数据是publisher通过onNext向下传递的。

总能听到订阅是向上订阅,数据是向下流动,怎么理解?

Reactor 内部会构建一个 订阅链(Subscriber Chain),“订阅链”本质上是一个由 Subscriber 构成的“回调链表” ,数据要想从头向下流动,肯定是要进行订阅的,订阅就需要订阅者。所以我们平常用的操作符(如 mapfilter)都包装一个 Subscriber,当你调用 .subscribe() 时,它从链的末端开始,逐级向上游触发 .subscribe() ,形成“向上订阅”。

一、先看一个简单的例子

Mono.just("hello")
    .map(s -> s + "-1")
    .map(s -> s + "-2")
    .subscribe(System.out::println);

每个操作符都返回一个新的 Mono,当你调用map的时候。

Reactor 内部做了什么?

// 伪代码:map 操作符的实现
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
    return new MonoMap<>(this, mapper); // 返回一个新的 Mono
}
  • this 是上游的 Mono(比如 Mono.just),就是调用.map的那个对象,那肯定就是上游的mono实例

  • MonoMap 是一个包装类,它持有:

    • 上游 source
    • 当前操作的函数 mapper

所以整个链就变成了如下的感觉:

Mono<String> source = new MonoJust<>("hello");
Mono<String> mapped1 = new MonoMap<>(source, s -> s + "-1");
Mono<String> mapped2 = new MonoMap<>(mapped1, s -> s + "-2");

所以最底层的mapped2,持有了上层的mapped1,而mapped1持有了上层的source

结构如下:从底层逐渐走向上层。这就是向上订阅

mapped2 → mapped1 → source

订阅触发

当你调用:

mapped2.subscribe(System.out::println);

mapped2(即 MonoMap)创建一个自己的 Subscriber,因为它作为一个中间算子,虽然是一个publisher(因为Mono和Flux都是数据的发布者),那么它就要发布数据,数据从哪里来呢,肯定是从上游过来。这就决定了每个算子的内部都要包装一个subcriber.

每个 MonoMap 长什么样?
class MonoMap<T, R> extends Mono<R> {
    final Publisher<T> source;        // 上游的 Publisher
    final Function<T, R> mapper;      // 当前 map 函数

    public void subscribe(Subscriber<? super R> actual) {
        // actual 是下游的 Subscriber
        MapSubscriber<T, R> parent = new MapSubscriber<>(actual, mapper);
        source.subscribe(parent); // 向上游发起订阅
    }
}

所以 MonoMap 的作用是:

  • 拿到下游的 Subscriber----actual
  • 包装成一个 MapSubscriber
  • 然后让上游去订阅这个 MapSubscriber
步骤 1:mapped2(即 MonoMap)创建一个自己的 Subscriber
// 在 MonoMap(mapper2) 中
Subscriber<? super R> actual = System.out::println;

// 创建一个 MapSubscriber,它会执行 mapper2,并把结果传给 actual
MapSubscriber<T, R> parent = new MapSubscriber<>(actual, mapper2);
// parent 的作用:拿到数据 → 执行 s -> s + "-2" → 传给 System.out::println

// 向上游订阅:让 mapped1 去订阅 parent
source.subscribe(parent); // source = mapped1

这里有几点说明

  1. source = mapped1,就是上游的Mono,因为我们构建链条的时候,说过底层的算子会持有上游的算子
  2. mapper2 就是构建时候传递的 s -> s + "-2"
  3. 构建MapSubscriber的时候需要把mapper2传递进去,因为它要把上游来的数据先处理下,在传递给下游

下面的代码就截取自FluxMap中的MapSubscriber,它的作用就是上游数据来的时候,先mapper,再把mapper的结果传递给下游 image.png

image.png

步骤 2:mapped1 接到订阅,继续向上
// mapped1 的 subscribe()
public void subscribe(Subscriber<? super T> actual) {
    // actual = MapSubscriber(println, mapper2)
    MapSubscriber<T, R> parent = new MapSubscriber<>(actual, mapper1);

    source.subscribe(parent); // source = MonoJust("hello")
}
步骤 3:源头 MonoJust 最终被订阅
// MonoJust的subscribe()
public void subscribe(Subscriber<? super T> actual) {
    // actual = MapSubscriber(MapSubscriber(println, mapper2), mapper1)

    // 直接触发数据流
    actual.onSubscribe(Operators.emptySubscription());
    actual.onNext(value);  // "hello"
    actual.onComplete();
}
四、数据流动:从源头“向下”传递

源头触发:

actual.onNext("hello");

此时的 actual 是最上游的 MapSubscriber,它的 onNext 实现是:

// MapSubscriber.onNext()
public void onNext(T t) {
    R mapped = mapper.apply(t); // 执行 map 函数
    actual.onNext(mapped);      // 向下游传递
}

所以流程是:

MonoJust: onNext("hello")
    ↓
MapSubscriber1: mapper1("hello") → "hello-1"
    ↓
MapSubscriber2: mapper2("hello-1") → "hello-1-2"
    ↓
System.out::println("hello-1-2")

说了这么多 我们发现

  • 每个操作符内部都创建一个 Subscriber 的实现类(如 MapSubscriber)。用来去订阅上游发布者的数据
  • 每个Subscriber内部还要包含一个下游的Subscriber,将处理好的数据(mapper)发送给下游的订阅者
本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:[email protected]