响应式编程是一种编程范式,强调数据流和变化传播。其核心理念是通过观察者模式来管理异步数据流,使得程序能够自动响应数据的变化。以下是响应式编程的几个关键特点:
- 数据流:数据被视为流,程序可以在流中定义操作,处理数据的变化。
- 异步处理:响应式编程通常与异步操作结合,允许程序在等待数据的同时继续执行其他任务。
- 变化传播:当数据源发生变化时,所有依赖于该数据的部分会自动更新,减少了手动管理状态的复杂性。
- 背压:允许消费者控制数据流速,以避免过载。
- 组合性:支持将多个数据流组合在一起进行处理,提供了灵活的操作。
FIT 提供了多个接口以供开发者使用流式功能,有Choir和Solo两种响应式流,开发者可以使用 FIT 提供的默认实现进行简单的响应式编程,以下是一些例子,帮助您快速入门:
- 通过 range 函数创建 Choir 响应式流并发布。
//result:0, 2, 4
List<Integer> actual = Choir.range(0, 5, 2).blockAll();- 将 Choir 响应式流数据转换为列表数据,
buffer()函数将数据按照指定大小进行分组,转换成列表数据后继续发送,blockAll()函数订阅响应式流,无需开发者指定subscriber。
//result:[1, 2], [3]
List<List<Integer>> lists = Choir.just(1, 2, 3).buffer(2).blockAll();- 使用 map 方法转换 Choir 响应式流。
//result:"1", "2", "3"
List<String> actual = Choir.just(1, 2, 3).map(String::valueOf).blockAll();- 当响应式流仅有 0~1 个数据时,可以使用 Solo 响应式流。
//result:1
Solo.just(1).filter(value -> value % 2 == 1).subscribe(subscriber);- 使用 map 方法转换 Solo 响应式流
//result:"1"
Optional<String> actual = Solo.just(1).map(String::valueOf).block();- Solo 响应式流可以转换为 Choir 响应式流
//result:1
List<Integer> actual = Solo.just(1).toChoir().blockAll();- 通过数组创建 Choir 响应式流,并筛选其中的奇数,最后将该响应流的数据发布给订阅者,此处订阅者
subscriber需自定义,在 10.3.5 将会介绍subscriber的用法。
//result:1
Choir.just(0, 1, 2).filter(value -> value % 2 == 1).subscribe(subscriber);- 通过数组创建 Choir 响应式流,并发布给订阅者,其中,使用 Lambda 表达式快速定义
subscriber,无需进行额外实现,其中,BiConsumer<Subscription, T>表示响应式流中的数据消费时的行为,此处为(subscription, integer)。
//result:0, 1, 2, 3, 4
void temp() {
Choir.just(0, 1, 2, 3, 4).subscribe((subscription, integer) -> {
System.out.println(integer);
});
}除了 FIT 提供的默认实现,开发者也可以自定义实现Publisher、Subscriber、Subscription、Emitter来进行消息发布-订阅操作,实现复杂的响应式功能,本章节介绍响应流和各模块接口的具体定义,方便开发者进行响应式编程的自定义。
FIT 定义了Choir接口来进行流式数据的处理,它可以拥有多个数据,开发者可以将多种类型转换为响应式流,从而实现异步的响应式编程。
Choir可以用多种类型实例创建响应式流,以下均属于静态方法,返回一个创建后的 Choir响应式流实例:
| 接口 | 接口说明 |
|---|---|
| create(Consumer<Emitter<T>>) | 通过指定的 Emitter 的消费逻辑创建 Choir 响应式流 |
| empty() | 创建一个空的响应式流 |
| fromEmitter(Emitter<T>) | 将一个 Emitter 转换成 Choir 响应式流 |
| fromIterable(Iterable<T>) | 将一个 Iterable 转换成 Choir 响应式流 |
| fromPublisher(Publisher<T>) | 将一个 Publisher 适配成 Choir 响应式流 |
| just(T... array) | 将指定的数组转换成 Choir 响应式流 |
| range(int stop) | 从 1 开始,到指定的终止值(不包含)结束,间隔步长为 1,将所有整数转换成响应式流 |
| range(int start, int stop) | 从指定的起始值开始,到指定的终止值(不包含)结束,间隔步长为 1,将所有整数转换成响应式流 |
| range(int start, int stop, int step) | 从指定的起始值开始,到指定的终止值(不包含)结束,间隔指定的步长,将所有整数转换成响应式流 |
同时,Choir 提供了多种常用的数据处理方法:
| 接口 | 接口说明 |
|---|---|
| buffer(int size) | 将数据按照指定大小进行分组,转换成列表数据后继续发送 |
| count() | 计算响应式流中的元素数量后,将元素数量发送 |
| distinct() | 将上游元素去重后传递给下游 |
| filter(Predicate<T>) | 将每个数据按照指定方式判断是否符合要求,并将符合要求的数据继续发送 |
| first() | 仅保留第一个元素,并继续发送 |
| first(Predicate<T> filter) | 获取满足条件的第一个元素,并继续发送 |
| flatMap(Function<T, Publisher<R>>) | 将每个数据通过指定的方式转换为一个响应式流,并将各响应式流中的每个元素依次发送给下游 |
| map(Function<T, R>) | 将每个数据通过指定的方式进行转换后继续发送 |
| reduce(BinaryOperator<T>) | 将每个数据通过指定的方式进行合并后,形成一个新的数据,并继续发送 |
| skip(int count) | 从响应式流的开始跳过指定数量个元素,并继续发送 |
| subscribe() | 直接使用 EmptySubscriber 订阅响应式流 |
| subscribe(BiConsumer<Subscription, T>) | 使用 Lambda 表达式订阅响应式流,指定 响应式流中的数据消费时的行为,并开始消费响应式流中的数据。 |
| subscribe(Consumer<Subscription>, BiConsumer<Subscription, T> ,Consumer, BiConsumer) | 使用 Lambda 表达式订阅响应式流,指定响应式流被订阅时的行为,响应式流中的数据消费时的行为,响应式流正常终结时的行为和响应式流异常终结时的行为,并开始消费响应式流中的数据。 |
| blockAll() | 订阅响应式流,并阻塞等待所有结果 |
Solo接口的使用方式与Choir相似,但Solo响应式流只能拥有 0 或 1 个数据,以下均属于静态方法,返回一个创建后的Solo响应式流实例:
| 接口 | 接口说明 |
|---|---|
| create(Consumer<Emitter<T>>) | 通过指定的 Emitter 消费逻辑创建 Solo 响应式流 |
| empty() | 创建一个空的 Solo 响应式流 |
| fromEmitter(Emitter<T>) | 将一个 Emitter 转换成 Solo 响应式流 |
| fromPublisher(Publisher<T>) | 将一个 Publisher 适配成 Solo 响应式流 |
| just(T) | 将指定的数据转换成 Solo 响应式流 |
Solo 的常用数据处理接口方法如下:
| 接口 | 接口说明 |
|---|---|
| filter(Predicate<T>) | 将每个数据按照指定方式判断是否符合要求,并将符合要求的数据继续发送 |
| flatMap(Function<T, Publisher<R>>) | 将每个数据通过指定的方式转换为一个响应式流,并将各响应式流中的每个元素依次发送给下游 |
| map(Function<T, R>) | 将每个数据通过指定的方式进行转换后继续发送 |
| toChoir() | 将当前响应式流转换成 Choir |
| subscribe() | 直接使用 EmptySubscriber 订阅响应式流 |
| subscribe(BiConsumer<Subscription, T>) | 使用 Lambda 表达式订阅响应式流,指定 响应式流中的数据消费时的行为,并开始消费响应式流中的数据。 |
| subscribe(Consumer<Subscription>, BiConsumer<Subscription, T> ,Consumer, BiConsumer) | 使用 Lambda 表达式订阅响应式流,指定响应式流被订阅时的行为,响应式流中的数据消费时的行为,响应式流正常终结时的行为和响应式流异常终结时的行为,并开始消费响应式流中的数据。 |
| block() | 订阅响应式流,并阻塞等待结果 |
Emitter为数据发送者,开发者可以使用 FIT 提供的默认Emitter或自定义的Emitter创建一个响应式流,通过Emitter实例进行数据的流式发送,Emitter的接口方法定义如下:
| 接口 | 接口说明 |
|---|---|
| create() | 静态方法,创建一个默认的数据发送器 |
| emit(T data) | 发送一个指定的数据 |
| complete() | 发送一个正常终结信号 |
| fail(Exception cause) | 发送一个异常终结信号 |
| observe(Observer observer) | 添加一个观察者,用于观察数据发送者的一系列行为 |
其中,Observer属于内部接口,它表示Emitter的观察者,它含有以下定义:
| 接口 | 接口说明 |
|---|---|
| onEmittedData(T data) | 当 Emitter.emit(Object) 方法被调用时触发的事件 |
| onCompleted() | 当 Emitter.complete() 方法被调用时触发的事件 |
| onFailed(Exception cause) | 当 Emitter.fail(Exception) 方法被调用时触发的事件 |
为Emitter添加Observer后,可以观察Emitter的行为,并进行相应事件的触发,例如可以在Observer中添加各类日志等,降低代码的耦合性。
Emitter使用方法示例如下:
Emitter<Integer> emitter = Emitter.create();
emitter.emit(1);
List<Integer> l2 = new ArrayList<>();
Choir.fromEmitter(emitter).subscribe((subscription, i) -> {
l2.add(i);
subscription.cancel();
});
emitter.emit(2);
emitter.emit(3);此处使用了
Emitter接口的create()方法创建默认Emitter,开发者也可以选择自定义Emitter进行使用。
Publisher表示发布者,它负责产生数据流并发布给订阅者,开发者可以自定义一个Publisher,并通过Publisher实例创建Choir响应式流,Publisher可以产生一系列数量不限的的元素,并按照其订阅者的要求进行发布,一个发布者可为多个订阅者提供服务,其订阅者通过调用其subscribe(Subscriber)方法对其进行订阅。
以下是Publisher接口的定义:
public interface Publisher<T> {
/**
* 向发布者订阅以启动数据发送。
* <p>该方法可被多次执行,每次将为其订阅者产生一个新的 {@link Subscription},在订阅过程中发生的异常将通过
* {@link Subscriber#fail(Exception)} 进行传递。</p>
*
* @param subscriber 表示已订阅的 {@link Subscriber}{@code <}{@link T}{@code >}。
*/
void subscribe(Subscriber<T> subscriber);
}通过
Choir接口提供的静态创建方法创建的响应式流已默认实现Publisher,开发者无需在简单使用中自定义Publisher。
Subscriber表示订阅者,可以处理发布者所发布的数据,以及订阅开始、正常终结和异常终结信号。开发者可以自定义一个Subscriber来控制订阅者逻辑,Subscriber接口方法定义及使用说明如下:
onSubscribed(Subscription subscription):该方法表示订阅关系发生时的事件,它会在Publisher调用其subscribe(Subscriber)函数后被调用。consume(T data):该方法表示消费所订阅的数据,Publisher所发布的数据将由此方法进行消费,开发者需在此方法中定义数据的消费逻辑。complete():该方法表示此订阅正常终结,即当Publisher发送正常终结信号后,该Subscriber的任何方法将不会再被调用。isCompleted():该方法表示查询当前Subscriber是否已正常终结。fail(Exception cause):该方法表示此订阅异常终结,即当Publisher发送异常终结信号后,该Subscriber的任何方法将不会再被调用。isFailed():该方法表示查询当前Subscriber是否已经失败。empty():静态方法,获取一个空的订阅者。functional(...):静态方法,表示通过指定的 Lambda 表达式,获取一个订阅者。
发布者所发布元素数量不大于其通过
Subscription所请求的元素数量,发布者最多发布一个异常或数据结束事件。
在使用Subscriber时,开发者需要自定义Subscriber的订阅和数据消费逻辑,并创建实例订阅响应式流的数据,示例如下:
public class MySubscriber<T> extends EmptySubscriber<T> {
private Subscription subscription;
@Override
public void onSubscribed(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void consume(T data) {
System.out.println(data);
this.subscription.request(1);
}
@Override
protected void complete(@Nonnull Subscription subscription) {
System.out.println("completed");
}
@Override
public void fail(Exception cause) {
System.out.println("failed");
}
}使用该类进行消息的订阅,示例如下:
@Test
void temp() {
MySubscriber<Integer> subscriber = new MySubscriber<>();
Choir.just(0, 1, 2).subscribe(subscriber);
}结果如下:
0
1
2
completed
开发者需要自定义Subscriber的实现,同时应当在使用过程中正确发送
complete及fail信号。
Subscription表示Publisher和Subscriber之间的订阅关系,Subscription管理着 Publisher和Subscriber之间的数据流,同时提供了Subscriber向Publisher控制数据的相关方法。以下是Subscription定义的接口方法:
| 接口 | 接口说明 |
|---|---|
| request(long count) | 请求指定数量的数据。 |
| cancel() | 取消当前的订阅关系。 |
| isCancelled() | 判断当前订阅关系是否已经取消。 |
FIT 提供了FlowableException来指示响应式编程的异常,当用户使用自定义响应式编程时,可使用该类来抛出异常,该类方法如下:
FlowableException(String message):通过异常信息来初始化响应式编程框架的基础异常。FlowableException(Throwable cause):通过异常原因来初始化响应式编程框架的基础异常。FlowableException(String message, Throwable cause):通过异常信息和异常原因来初始化响应式编程框架的基础异常。