image
Flux<T>是一个标准Publisher<T>,表示0到N个发射项的异步序列,可选地以完成信号或错误终止。与Reactive Streams规范中一样,这三种类型的信号转换为对下游订阅者的onNext、onComplete或onError方法的调用。在这种大范围的可能信号中,Flux是通用的reactive 类型。注意,所有事件,甚至终止事件,都是可选的:没有onNext事件,但是onComplete事件表示一个空的有限序列,但是移除onComplete并且您有一个无限的空序列(除了关于取消的测试之外,没有特别有用)。同样,无限序列不一定是空的。例如,Flux.interval(Duration) 产生一个Flux<Long>,它是无限的,从时钟发出规则的数据。Mono发射0到1个元素的异步"发射器
image

以编程方式创建具有多次发射能力的Flux,元素通过FluxSink API以同步或异步方式进行。
eg:
Flux.create((t) -> { t.next("create"); t.next("create1"); t.complete();}).subscribe(System.out::println);
generate
以编程方式创建一个的Flux,通过consumer回调逐一生成信号;generate中next只能调1次,否则会报错 reactor.core.Exceptions$ErrorCallbackNotImplemented
image
eg:
Flux.generate(t -> { t.next("generate"); //注意generate中next只能调用1次 t.complete();}).subscribe(System.out::println);
just
创建一个Flux,它发出所提供的元素,然后完成。
image
eg:
//单个元素Flux.just("just").subscribe(System.out::println);//多个元素Flux.just("just", "just1", "just2").subscribe(System.out::println);
from
用Flux API装饰指定的Publisher,通过Publisher创建一个Flux
image
eg:
//Flux->FluxFlux.from(Flux.just("just", "just1", "just2")).subscribe(System.out::println);//Mono->MonoFlux.from(Mono.just("just")).subscribe(System.out::println);
fromArray
创建一个Flux,它发出包含在提供的数组中的项。
image
eg:
Flux.fromArray(new String[] { "arr", "arr", "arr", "arr" }).subscribe(System.out::println);
fromIterable
创建一个个Flux,它发出所提供的Iterable中包含的项。将为每个subscriber创建一个新的Iterable。
image
eg:
Set<String> v = new HashSet<>();v.add("1");v.add("2");v.add("3");Flux.fromIterable(() -> v.iterator()).subscribe(System.out::println);
fromStream
创建一个Flux,它发出所提供的Stream中包含的项。请记住,Stream不能被重新使用,这可能是有问题的。多订阅或重订阅的情况(如repeat或retry)Stream是closed由操作员取消,错误或完成。
image
defer每当对得到的Flux进行Subscription时,延迟提供Publisher,因此实际的源实例化被推迟,直到每个订阅和Supplier可以创建订阅者特定的实例。但是,如果供应商没有生成新的实例,这个操作符将有效地从Publisher起作用。
image
eg:
Flux.defer(() -> Flux.just("just", "just1", "just2")).subscribe(System.out::println);
interval
创建一个Flux,它以0开始发射长值并递增全局计时器上指定的时间间隔。如果需求没有及时产生,一个OnError将用来发出信号。IllegalStateException详细说明无法发出的信息。在正常情况下,Flux将永远不会完成。
image
eg:
Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).subscribe(System.out::println);//防止程序过早退出,放一个CountDownLatch拦住CountDownLatch latch = new CountDownLatch(1);latch.await();// System.in.read(); 更好
empty
创建一个Flux,完成而不发射任何项目。
image
eg:
Flux.empty().subscribe(System.out::println);
error
创建一个Flux,它在订阅之后立即以指定的错误终止。
image
eg:
Flux.error(new RuntimeException()).subscribe(System.out::println);
never
创建一个Flux,它永远不会发出任何数据、错误或完成信号。
image
eg:
Flux.never().subscribe(System.out::println);
range
建立一个Flux,它只会发出一个count递增整数的序列,从start开始。也就是说,在start(包含)和start + count(排除)之间发出整数,然后完成。
image
eg:
Flux.range(0, 100).subscribe(System.out::println);
作者:Mr_1214链接:https://www.jianshu.com/p/611f3667c4d2来源:简书
Java8日期操作
java对象大小
Java自定义锁的实现
Java创建对象方式2
Java创建对象方式
Java中Unsafe使用详解
深入理解Java内存模型
SpringBoot邮件发送示例
SpringBoot配置文件你了解多少?
SpringBoot多数据源配置详解
SpringBoot RabbitMQ消息可靠发送与接收
springboot mybatis jpa 实现读写分离