在上一篇中,我们已经对reactor有了一个简单的认识,这篇我们将详细地介绍一些Reactor常见的operator。

buffer

buffer操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// buffer 20个,输出5 个包含 20 个元素的数组
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
// buffer1秒,输出2个包括10个元素的数组
Flux.interval(Duration.ofMillis(100)).buffer(Duration.ofMillis(1001)).take(2).subscribe(System.out::println);
// buffer 10个,500毫秒超时
Flux.interval(Duration.ofMillis(100)).bufferTimeout(10, Duration.ofMillis(500)).take(2).subscribe(System.out::println);
// bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;
// 输出的是 5 个包含 2 个元素的数组
// 每当遇到一个偶数就会结束当前的收集
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
// bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集。
// 输出的是 5 个包含 1 个元素的数组
// 数组里面包含的只有偶数
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

// 第四行语句输出的是 5 个包含 1 个元素的数组

filter

filter操作符对流中的元素进行过滤,只留下满足 Predicate 指定条件的元素。

1
2
//下面代码的输出的是 1 到 10 中的所有偶数。
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

window

window操作符把流中的元素收集到另外的Flux 序列中。window操作符的作用类似于buffer,所不同的是window操作符是把当前流中的元素收集到另外的Flux序列中,因此返回值类型是 Flux

1
2
3
4
5
6
7
8
9
 Flux.range(1, 100).window(20).subscribe(item -> {
System.out.println("buffer handle start");
item.subscribe(System.out::println);
});

Flux.interval(Duration.ofMillis(100)).window(Duration.ofMillis(1001)).take(2).subscribe(item -> {
System.out.println("buffer handle begin");
item.subscribe(System.out::println);
});

reduce

reduce和reduceWith操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的Mono序列。

1
2
3
4
//输出5050
Flux.range(1, 100).reduce(Integer::sum).subscribe(System.out::println);
//下面的示例代码中也是同样的相加操作,不过通过Supplier给出了初始值 100,所以结果为 5050 + 100 = 5150;
Flux.range(1, 100).reduceWith(() -> 100, Integer::sum).subscribe(System.out::println);

zipWith

zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。

1
2
3
4
5
//合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流;
Flux<Tuple2<Integer, Integer>> tupleResults = Flux.range(1, 100).zipWith(Flux.range(101, 100));
tupleResults.subscribe(t -> System.out.println(t.getT1() + t.getT2()));
//zipWith 操作符也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值
Flux.range(1, 100).zipWith(Flux.range(101, 100), Integer::sum).subscribe(System.out::println);

take

take系列操作符用来从当前流中提取元素。

1
2
3
4
5
6
7
8
9
10
//取前10个,输出的是数字 1 到 10
Flux.range(1, 1000).take(10).subscribe(System.out::println);
// 取最后10个,输出的是数字 991 到 1000
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
//提取元素直到 Predicate 返回 true(Predicate 返回 true 的元素也是包含在内的),输出的是数字 1 到 10
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
//当 Predicate 返回 true 时才进行提取,输出的是数字 1 到 9
Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
//提取元素直到另外一个流开始产生元素,输出的数字0到9
Flux.interval(Duration.ofMillis(100)).takeUntilOther(Flux.interval(Duration.ofMillis(1001))).subscribe(System.out::println);

merge

merge和mergeSequential操作符用来把多个流合并成一个Flux序列。不同之处在于 :

  • merge按照所有流中元素的实际产生顺序来合并
  • mergeSequential则按照所有流被订阅的顺序,以流为单位进行合并。
1
2
3
4
5
6
// 输出00 11 22 33 44
Flux.merge(Flux.interval(Duration.ofMillis(100)).take(5), Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(5))
.subscribe(System.out::println);
// 输出0 1 2 3 4 0 1 2 3 4
Flux.mergeSequential(Flux.interval(Duration.ofMillis(100)).take(5), Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(5))
.subscribe(System.out::println);

map

map操作用来将序列转换成另一个类型的序列。

1
2
//int类型转换成String类型
Flux.range(1, 10).map(v -> "value is " + v).subscribe(System.out::println);

flatMap

flatMap和flatMapSequential操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential和flatMap之间的区别与 mergeSequential 和 merge 之间的区别是一样的:

  • flatMap 按照所有流中元素的实际产生顺序来合并。
  • flatMapSequential 则按照所有流被订阅的顺序,以流为单位进行合并。
1
2
3
4
5
6
// 输出0 1 0 2 1 2
Flux.range(1, 2).flatMap(v -> Flux.interval(Duration.ofMillis(100L * v)).take(3)).subscribe(System.out::println);
TimeUnit.SECONDS.sleep(4);
// 输出0 1 2 0 1 2
Flux.range(1, 2).flatMapSequential(v -> Flux.interval(Duration.ofMillis(100L * v)).take(3)).subscribe(System.out::println);
TimeUnit.SECONDS.sleep(4);

concatMap

concatMap操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与flatMap不同的是,concatMap会根据原始流中的元素顺序依次把转换之后的流进行合并;与flatMapSequential不同的是,concatMap对转换之后的流的订阅是动态进行的,而flatMapSequential在合并之前就已经订阅了所有的流。

1
2
3
//输出0 1 2 0 1 2
Flux.range(1, 2).concatMap(v -> Flux.interval(Duration.ofMillis(100L * v)).take(3)).subscribe(System.out::println);
TimeUnit.SECONDS.sleep(4);

combineLatest

combineLatest操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。

Flux.combineLatest(
Arrays::toString,
Flux.intervalMillis(100).take(5),
Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);

1
2
3
4
5
6
7
//0  1 2
// 0 1 2
// 0 1 23 4
//输出0,1,2,3,4
Flux.combineLatest(Flux.interval(Duration.ofMillis(100L)).take(3), Flux.interval(Duration.ofMillis(10L), Duration.ofMillis(200L)).take(3), (v1, v2) -> v1 + v2)
.subscribe(System.out::println);
TimeUnit.SECONDS.sleep(4);

concatWith

将另一个流连接到当前流后面

1
2
// 输出0 1 2 0 1 2
Flux.interval(Duration.ofMillis(100)).take(3).concatWith(Flux.interval(Duration.ofMillis(100)).take(3)).subscribe(System.out::println);

retry

当出现错误时,可以通过 retry 操作符来进行重试。重试的动作是通过重新订阅序列来实现的。在使用 retry 操作符时可以指定重试的次数。

1
2
3
4
5
//下面的代码指定了重试次数为 1,所输出的结果是 0 1 2 0 1 2和exception
Flux.range(0, 3)
.concatWith(Mono.error(new IllegalStateException()))
.retry(1)
.subscribe(System.out::println);

log

log操作符将流相关的事件记录在日志中。下面的代码添加了 log 操作符并指定了日志分类的名称:

1
Flux.range(1, 2).log("Range Seq").subscribe(System.out::println);

输出:

22:16:12.950 [main] INFO Range Seq - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
22:16:12.952 [main] INFO Range Seq - | request(unbounded)
22:16:12.952 [main] INFO Range Seq - | onNext(1)
1
22:16:12.953 [main] INFO Range Seq - | onNext(2)
2
22:16:12.953 [main] INFO Range Seq - | onComplete()