在上一篇中,我们已经对reactor有了一个简单的认识,这篇我们将详细地介绍一些Reactor常见的operator。
buffer buffer操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Flux.range(1 , 100 ).buffer(20 ).subscribe(System.out::println); Flux.interval(Duration.ofMillis(100 )).buffer(Duration.ofMillis(1001 )).take(2 ).subscribe(System.out::println); Flux.interval(Duration.ofMillis(100 )).bufferTimeout(10 , Duration.ofMillis(500 )).take(2 ).subscribe(System.out::println); Flux.range(1 , 10 ).bufferUntil(i -> i % 2 == 0 ).subscribe(System.out::println); Flux.range(1 , 10 ).bufferWhile(i -> i % 2 == 0 ).subscribe(System.out::println);
// 第四行语句输出的是 5 个包含 1 个元素的数组
filter filter操作符对流中的元素进行过滤,只留下满足 Predicate 指定条件的元素。
1 2 Flux.range(1 , 10 ).filter(i -> i % 2 == 0 ).subscribe(System.out::println);
window window操作符把流中的元素收集到另外的Flux 序列中。window操作符的作用类似于buffer,所不同的是window操作符是把当前流中的元素收集到另外的Flux序列中,因此返回值类型是 Flux。
1 2 3 4 5 6 7 8 9 Flux.range(1 , 100 ).window(20 ).subscribe(item -> { System.out.println("buffer handle start" ); item.subscribe(System.out::println); }); Flux.interval(Duration.ofMillis(100 )).window(Duration.ofMillis(1001 )).take(2 ).subscribe(item -> { System.out.println("buffer handle begin" ); item.subscribe(System.out::println); });
reduce reduce和reduceWith操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的Mono序列。
1 2 3 4 Flux.range(1 , 100 ).reduce(Integer::sum).subscribe(System.out::println); Flux.range(1 , 100 ).reduceWith(() -> 100 , Integer::sum).subscribe(System.out::println);
zipWith zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。
1 2 3 4 5 Flux<Tuple2<Integer, Integer>> tupleResults = Flux.range(1 , 100 ).zipWith(Flux.range(101 , 100 )); tupleResults.subscribe(t -> System.out.println(t.getT1() + t.getT2())); Flux.range(1 , 100 ).zipWith(Flux.range(101 , 100 ), Integer::sum).subscribe(System.out::println);
take take系列操作符用来从当前流中提取元素。
1 2 3 4 5 6 7 8 9 10 Flux.range(1 , 1000 ).take(10 ).subscribe(System.out::println); Flux.range(1 , 1000 ).takeLast(10 ).subscribe(System.out::println); Flux.range(1 , 1000 ).takeUntil(i -> i == 10 ).subscribe(System.out::println); Flux.range(1 , 1000 ).takeWhile(i -> i < 10 ).subscribe(System.out::println); Flux.interval(Duration.ofMillis(100 )).takeUntilOther(Flux.interval(Duration.ofMillis(1001 ))).subscribe(System.out::println);
merge merge和mergeSequential操作符用来把多个流合并成一个Flux序列。不同之处在于 :
merge按照所有流中元素的实际产生顺序来合并
mergeSequential则按照所有流被订阅的顺序,以流为单位进行合并。
1 2 3 4 5 6 Flux.merge(Flux.interval(Duration.ofMillis(100 )).take(5 ), Flux.interval(Duration.ofMillis(50 ), Duration.ofMillis(100 )).take(5 )) .subscribe(System.out::println); Flux.mergeSequential(Flux.interval(Duration.ofMillis(100 )).take(5 ), Flux.interval(Duration.ofMillis(50 ), Duration.ofMillis(100 )).take(5 )) .subscribe(System.out::println);
map map操作用来将序列转换成另一个类型的序列。
1 2 Flux.range(1 , 10 ).map(v -> "value is " + v).subscribe(System.out::println);
flatMap flatMap和flatMapSequential操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential和flatMap之间的区别与 mergeSequential 和 merge 之间的区别是一样的:
flatMap 按照所有流中元素的实际产生顺序来合并。
flatMapSequential 则按照所有流被订阅的顺序,以流为单位进行合并。
1 2 3 4 5 6 Flux.range(1 , 2 ).flatMap(v -> Flux.interval(Duration.ofMillis(100L * v)).take(3 )).subscribe(System.out::println); TimeUnit.SECONDS.sleep(4 ); Flux.range(1 , 2 ).flatMapSequential(v -> Flux.interval(Duration.ofMillis(100L * v)).take(3 )).subscribe(System.out::println); TimeUnit.SECONDS.sleep(4 );
concatMap concatMap操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与flatMap不同的是,concatMap会根据原始流中的元素顺序依次把转换之后的流进行合并;与flatMapSequential不同的是,concatMap对转换之后的流的订阅是动态进行的,而flatMapSequential在合并之前就已经订阅了所有的流。
1 2 3 Flux.range(1 , 2 ).concatMap(v -> Flux.interval(Duration.ofMillis(100L * v)).take(3 )).subscribe(System.out::println); TimeUnit.SECONDS.sleep(4 );
combineLatest combineLatest操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。
Flux.combineLatest( Arrays::toString, Flux.intervalMillis(100).take(5), Flux.intervalMillis(50, 100).take(5) ).toStream().forEach(System.out::println);
1 2 3 4 5 6 7 Flux.combineLatest(Flux.interval(Duration.ofMillis(100L )).take(3 ), Flux.interval(Duration.ofMillis(10L ), Duration.ofMillis(200L )).take(3 ), (v1, v2) -> v1 + v2) .subscribe(System.out::println); TimeUnit.SECONDS.sleep(4 );
concatWith 将另一个流连接到当前流后面
1 2 Flux.interval(Duration.ofMillis(100 )).take(3 ).concatWith(Flux.interval(Duration.ofMillis(100 )).take(3 )).subscribe(System.out::println);
retry 当出现错误时,可以通过 retry 操作符来进行重试。重试的动作是通过重新订阅序列来实现的。在使用 retry 操作符时可以指定重试的次数。
1 2 3 4 5 Flux.range(0 , 3 ) .concatWith(Mono.error(new IllegalStateException())) .retry(1 ) .subscribe(System.out::println);
log log操作符将流相关的事件记录在日志中。下面的代码添加了 log 操作符并指定了日志分类的名称:
1 Flux.range(1 , 2 ).log("Range Seq" ).subscribe(System.out::println);
输出:
22:16:12.950 [main] INFO Range Seq - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 22:16:12.952 [main] INFO Range Seq - | request(unbounded) 22:16:12.952 [main] INFO Range Seq - | onNext(1) 1 22:16:12.953 [main] INFO Range Seq - | onNext(2) 2 22:16:12.953 [main] INFO Range Seq - | onComplete()