Skip to content

Latest commit

 

History

History
281 lines (214 loc) · 14.3 KB

File metadata and controls

281 lines (214 loc) · 14.3 KB

10.1 响应式编程概述

响应式编程是一种编程范式,强调数据流和变化传播。其核心理念是通过观察者模式来管理异步数据流,使得程序能够自动响应数据的变化。以下是响应式编程的几个关键特点:

  1. 数据流:数据被视为流,程序可以在流中定义操作,处理数据的变化。
  2. 异步处理:响应式编程通常与异步操作结合,允许程序在等待数据的同时继续执行其他任务。
  3. 变化传播:当数据源发生变化时,所有依赖于该数据的部分会自动更新,减少了手动管理状态的复杂性。
  4. 背压:允许消费者控制数据流速,以避免过载。
  5. 组合性:支持将多个数据流组合在一起进行处理,提供了灵活的操作。

10.2 FIT 流式功能简单使用

FIT 提供了多个接口以供开发者使用流式功能,有ChoirSolo两种响应式流,开发者可以使用 FIT 提供的默认实现进行简单的响应式编程,以下是一些例子,帮助您快速入门:

  1. 通过 range 函数创建 Choir 响应式流并发布。
//result:0, 2, 4
List<Integer> actual = Choir.range(0, 5, 2).blockAll();
  1. 将 Choir 响应式流数据转换为列表数据,buffer()函数将数据按照指定大小进行分组,转换成列表数据后继续发送,blockAll()函数订阅响应式流,无需开发者指定subscriber
//result:[1, 2], [3]
List<List<Integer>> lists = Choir.just(1, 2, 3).buffer(2).blockAll();
  1. 使用 map 方法转换 Choir 响应式流。
//result:"1", "2", "3"
List<String> actual = Choir.just(1, 2, 3).map(String::valueOf).blockAll();
  1. 当响应式流仅有 0~1 个数据时,可以使用 Solo 响应式流。
//result:1
Solo.just(1).filter(value -> value % 2 == 1).subscribe(subscriber);
  1. 使用 map 方法转换 Solo 响应式流
//result:"1"
Optional<String> actual = Solo.just(1).map(String::valueOf).block();
  1. Solo 响应式流可以转换为 Choir 响应式流
//result:1
List<Integer> actual = Solo.just(1).toChoir().blockAll();
  1. 通过数组创建 Choir 响应式流,并筛选其中的奇数,最后将该响应流的数据发布给订阅者,此处订阅者subscriber需自定义,在 10.3.5 将会介绍subscriber的用法。
//result:1
Choir.just(0, 1, 2).filter(value -> value % 2 == 1).subscribe(subscriber);
  1. 通过数组创建 Choir 响应式流,并发布给订阅者,其中,使用 Lambda 表达式快速定义subscriber,无需进行额外实现,其中,BiConsumer<Subscription, T>表示响应式流中的数据消费时的行为,此处为(subscription, integer)
//result:0, 1, 2, 3, 4
void temp() {
    Choir.just(0, 1, 2, 3, 4).subscribe((subscription, integer) -> {
        System.out.println(integer);
    });
}

10.3 FIT 流式功能接口

除了 FIT 提供的默认实现,开发者也可以自定义实现PublisherSubscriberSubscriptionEmitter来进行消息发布-订阅操作,实现复杂的响应式功能,本章节介绍响应流和各模块接口的具体定义,方便开发者进行响应式编程的自定义。

10.3.1 Choir 响应式流

FIT 定义了Choir接口来进行流式数据的处理,它可以拥有多个数据,开发者可以将多种类型转换为响应式流,从而实现异步的响应式编程。

Choir可以用多种类型实例创建响应式流,以下均属于静态方法,返回一个创建后的 Choir响应式流实例:

接口 接口说明
create(Consumer<Emitter<T>>) 通过指定的 Emitter 的消费逻辑创建 Choir 响应式流
empty() 创建一个空的响应式流
fromEmitter(Emitter<T>) 将一个 Emitter 转换成 Choir 响应式流
fromIterable(Iterable<T>) 将一个 Iterable 转换成 Choir 响应式流
fromPublisher(Publisher<T>) 将一个 Publisher 适配成 Choir 响应式流
just(T... array) 将指定的数组转换成 Choir 响应式流
range(int stop) 从 1 开始,到指定的终止值(不包含)结束,间隔步长为 1,将所有整数转换成响应式流
range(int start, int stop) 从指定的起始值开始,到指定的终止值(不包含)结束,间隔步长为 1,将所有整数转换成响应式流
range(int start, int stop, int step) 从指定的起始值开始,到指定的终止值(不包含)结束,间隔指定的步长,将所有整数转换成响应式流

同时,Choir 提供了多种常用的数据处理方法:

接口 接口说明
buffer(int size) 将数据按照指定大小进行分组,转换成列表数据后继续发送
count() 计算响应式流中的元素数量后,将元素数量发送
distinct() 将上游元素去重后传递给下游
filter(Predicate<T>) 将每个数据按照指定方式判断是否符合要求,并将符合要求的数据继续发送
first() 仅保留第一个元素,并继续发送
first(Predicate<T> filter) 获取满足条件的第一个元素,并继续发送
flatMap(Function<T, Publisher<R>>) 将每个数据通过指定的方式转换为一个响应式流,并将各响应式流中的每个元素依次发送给下游
map(Function<T, R>) 将每个数据通过指定的方式进行转换后继续发送
reduce(BinaryOperator<T>) 将每个数据通过指定的方式进行合并后,形成一个新的数据,并继续发送
skip(int count) 从响应式流的开始跳过指定数量个元素,并继续发送
subscribe() 直接使用 EmptySubscriber 订阅响应式流
subscribe(BiConsumer<Subscription, T>) 使用 Lambda 表达式订阅响应式流,指定 响应式流中的数据消费时的行为,并开始消费响应式流中的数据。
subscribe(Consumer<Subscription>, BiConsumer<Subscription, T> ,Consumer, BiConsumer) 使用 Lambda 表达式订阅响应式流,指定响应式流被订阅时的行为,响应式流中的数据消费时的行为,响应式流正常终结时的行为和响应式流异常终结时的行为,并开始消费响应式流中的数据。
blockAll() 订阅响应式流,并阻塞等待所有结果

10.3.2 Solo 响应式流

Solo接口的使用方式与Choir相似,但Solo响应式流只能拥有 0 或 1 个数据,以下均属于静态方法,返回一个创建后的Solo响应式流实例:

接口 接口说明
create(Consumer<Emitter<T>>) 通过指定的 Emitter 消费逻辑创建 Solo 响应式流
empty() 创建一个空的 Solo 响应式流
fromEmitter(Emitter<T>) 将一个 Emitter 转换成 Solo 响应式流
fromPublisher(Publisher<T>) 将一个 Publisher 适配成 Solo 响应式流
just(T) 将指定的数据转换成 Solo 响应式流

Solo 的常用数据处理接口方法如下:

接口 接口说明
filter(Predicate<T>) 将每个数据按照指定方式判断是否符合要求,并将符合要求的数据继续发送
flatMap(Function<T, Publisher<R>>) 将每个数据通过指定的方式转换为一个响应式流,并将各响应式流中的每个元素依次发送给下游
map(Function<T, R>) 将每个数据通过指定的方式进行转换后继续发送
toChoir() 将当前响应式流转换成 Choir
subscribe() 直接使用 EmptySubscriber 订阅响应式流
subscribe(BiConsumer<Subscription, T>) 使用 Lambda 表达式订阅响应式流,指定 响应式流中的数据消费时的行为,并开始消费响应式流中的数据。
subscribe(Consumer<Subscription>, BiConsumer<Subscription, T> ,Consumer, BiConsumer) 使用 Lambda 表达式订阅响应式流,指定响应式流被订阅时的行为,响应式流中的数据消费时的行为,响应式流正常终结时的行为和响应式流异常终结时的行为,并开始消费响应式流中的数据。
block() 订阅响应式流,并阻塞等待结果

10.3.3 Emitter 数据发送者

Emitter为数据发送者,开发者可以使用 FIT 提供的默认Emitter或自定义的Emitter创建一个响应式流,通过Emitter实例进行数据的流式发送,Emitter的接口方法定义如下:

接口 接口说明
create() 静态方法,创建一个默认的数据发送器
emit(T data) 发送一个指定的数据
complete() 发送一个正常终结信号
fail(Exception cause) 发送一个异常终结信号
observe(Observer observer) 添加一个观察者,用于观察数据发送者的一系列行为

其中,Observer属于内部接口,它表示Emitter的观察者,它含有以下定义:

接口 接口说明
onEmittedData(T data) 当 Emitter.emit(Object) 方法被调用时触发的事件
onCompleted() 当 Emitter.complete() 方法被调用时触发的事件
onFailed(Exception cause) 当 Emitter.fail(Exception) 方法被调用时触发的事件

Emitter添加Observer后,可以观察Emitter的行为,并进行相应事件的触发,例如可以在Observer中添加各类日志等,降低代码的耦合性。

Emitter使用方法示例如下:

Emitter<Integer> emitter = Emitter.create();
emitter.emit(1);
List<Integer> l2 = new ArrayList<>();
Choir.fromEmitter(emitter).subscribe((subscription, i) -> {
    l2.add(i);
    subscription.cancel();
});
emitter.emit(2);
emitter.emit(3);

此处使用了Emitter接口的create()方法创建默认Emitter,开发者也可以选择自定义Emitter进行使用。

10.3.4 Publisher 发布者

Publisher表示发布者,它负责产生数据流并发布给订阅者,开发者可以自定义一个Publisher,并通过Publisher实例创建Choir响应式流,Publisher可以产生一系列数量不限的的元素,并按照其订阅者的要求进行发布,一个发布者可为多个订阅者提供服务,其订阅者通过调用其subscribe(Subscriber)方法对其进行订阅。 以下是Publisher接口的定义:

public interface Publisher<T> {
    /**
     * 向发布者订阅以启动数据发送。
     * <p>该方法可被多次执行,每次将为其订阅者产生一个新的 {@link Subscription},在订阅过程中发生的异常将通过
     * {@link Subscriber#fail(Exception)} 进行传递。</p>
     *
     * @param subscriber 表示已订阅的 {@link Subscriber}{@code <}{@link T}{@code >}。
     */
    void subscribe(Subscriber<T> subscriber);
}

通过Choir接口提供的静态创建方法创建的响应式流已默认实现Publisher,开发者无需在简单使用中自定义Publisher

10.3.5 Subscriber 订阅者

Subscriber表示订阅者,可以处理发布者所发布的数据,以及订阅开始、正常终结和异常终结信号。开发者可以自定义一个Subscriber来控制订阅者逻辑,Subscriber接口方法定义及使用说明如下:

  1. onSubscribed(Subscription subscription):该方法表示订阅关系发生时的事件,它会在Publisher调用其subscribe(Subscriber)函数后被调用。
  2. consume(T data):该方法表示消费所订阅的数据,Publisher所发布的数据将由此方法进行消费,开发者需在此方法中定义数据的消费逻辑。
  3. complete():该方法表示此订阅正常终结,即当Publisher发送正常终结信号后,该Subscriber的任何方法将不会再被调用。
  4. isCompleted():该方法表示查询当前Subscriber是否已正常终结。
  5. fail(Exception cause):该方法表示此订阅异常终结,即当Publisher发送异常终结信号后,该Subscriber的任何方法将不会再被调用。
  6. isFailed():该方法表示查询当前Subscriber是否已经失败。
  7. empty():静态方法,获取一个空的订阅者。
  8. functional(...):静态方法,表示通过指定的 Lambda 表达式,获取一个订阅者。

发布者所发布元素数量不大于其通过Subscription所请求的元素数量,发布者最多发布一个异常或数据结束事件。

在使用Subscriber时,开发者需要自定义Subscriber的订阅和数据消费逻辑,并创建实例订阅响应式流的数据,示例如下:

public class MySubscriber<T> extends EmptySubscriber<T> {
    private Subscription subscription;

    @Override
    public void onSubscribed(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void consume(T data) {
        System.out.println(data);
        this.subscription.request(1);
    }

    @Override
    protected void complete(@Nonnull Subscription subscription) {
        System.out.println("completed");
    }

    @Override
    public void fail(Exception cause) {
        System.out.println("failed");
    }
}

使用该类进行消息的订阅,示例如下:

@Test
void temp() {
    MySubscriber<Integer> subscriber = new MySubscriber<>();
    Choir.just(0, 1, 2).subscribe(subscriber);
}

结果如下:

0
1
2
completed

开发者需要自定义Subscriber的实现,同时应当在使用过程中正确发送completefail信号。

10.3.6 Subscription 订阅关系

Subscription表示PublisherSubscriber之间的订阅关系,Subscription管理着 PublisherSubscriber之间的数据流,同时提供了SubscriberPublisher控制数据的相关方法。以下是Subscription定义的接口方法:

接口 接口说明
request(long count) 请求指定数量的数据。
cancel() 取消当前的订阅关系。
isCancelled() 判断当前订阅关系是否已经取消。

10.4 FIT 响应式编程基础异常

FIT 提供了FlowableException来指示响应式编程的异常,当用户使用自定义响应式编程时,可使用该类来抛出异常,该类方法如下:

  1. FlowableException(String message):通过异常信息来初始化响应式编程框架的基础异常。
  2. FlowableException(Throwable cause):通过异常原因来初始化响应式编程框架的基础异常。
  3. FlowableException(String message, Throwable cause):通过异常信息和异常原因来初始化响应式编程框架的基础异常。