《响应式编程》超详细教程

【第一章】规范

Reactive规范

Reactive Streams 是JVM面向流的库的标准和规范 1、处理可能无限数量的元素 2、有序 3、在组件之间异步传递元素 4、强制性非阻塞,背压模式 什么是正压?什么是被压? 正压就是请求全部一次性打过来,被压就是放在缓冲队列里,按照能力来处理。

线程越多越好吗?

【第二章】核心概念

基于异步、消息驱动的全事件回调系统:响应式系统

API Components: 1.Publisher: 发布者;产生数据流 2.Subscriber: 订阅者;消费数据流 3.Subscription: 订阅关系; a.订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。 4.Processor: 处理器; a.处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在 Reactor 中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。 这种模型遵循 Reactive Streams 规范,确保了异步流的一致性和可靠性。

1、Mono和Flux

Mono: 0|1个元素的流 Flux: N个元素的流; N > 1 发布者发布数据流:源头

2、subscribe()

说明:没订阅前什么也不会发生。

⾃定义流的信号感知回调

flux.subscribe(

v-> System.out.println("v = " + v), //流元素消费

throwable -> System.out.println("throwable = " + throwable), //感知异常结束

()-> System.out.println("流结束了...") //感知正常结束

);

⾃定义消费者

flux.subscribe(new BaseSubscriber() {

// 生命周期钩子1:订阅关系绑定的时候触发

@Override

protected void hookOnSubscribe(Subscription subscription) {

// 流被订阅的时候触发

System.out.println("绑定了...." + subscription);

// 找发布者要数据

request(1); // 要1个数据

requestUnbounded(); // 要无限数据

}

@Override

protected void hookOnNext(String value) {

System.out.println("数据到达,正在处理: " + value);

request(1); // 要1个数据

}

// hookOnComplete、hookOnError 二选一执行

@Override

protected void hookOnComplete() {

System.out.println("流正常结束....");

}

@Override

protected void hookOnError(Throwable throwable) {

System.out.println("流异常...." + throwable);

}

@Override

protected void hookOnCancel() {

System.out.println("流被取消....");

}

@Override

protected void hookFinally(SignalType type) {

System.out.println("最终回调....一定会被执行");

}

});

3、流的取消

消费者调⽤ cancle() 取消流的订阅;

4、BaseSubscriber

自定义消费者,推荐直接编写 BaseSubscriber 的逻辑;

【第三章】代码实例

创建流项目

io.projectreactor

reactor-bom

2023.0.0

pom

import

io.projectreactor

reactor-core

io.projectreactor

reactor-test

test

org.junit.jupiter

junit-jupiter

5.7.2

test

1、简单测试Flux

流不消费是不会运行的,懒惰性。

public void flux() throws IOException {

Flux just = Flux.just(1, 2, 3, 4, 5); //1、多元素的流

//流不消费就没用; 消费:订阅

just.subscribe(e -> System.out.println("e1 = " + e));

//一个数据流可以有很多消费者

just.subscribe(e -> System.out.println("e2 = " + e));

//对于每个消费者来说流都是一样的; 广播模式;

System.out.println("==========");

Flux flux = Flux.interval(Duration.ofSeconds(1));//每秒产生一个从0开始的递增数字

flux.subscribe(System.out::println);

System.in.read();//保证主线程不关闭

}

2、自定义流程处理器

1个发布者,

3个流程处理器

1个订阅者

package com.atguigu.flow;

import lombok.SneakyThrows;

import java.util.concurrent.Flow;

import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {

//定义流中间操作处理器; 只用写订阅者的接口

static class MyProcessor extends SubmissionPublisher implements Flow.Processor {

private Flow.Subscription subscription; //保存绑定关系

@Override

public void onSubscribe(Flow.Subscription subscription) {

System.out.println("processor订阅绑定完成");

this.subscription = subscription;

subscription.request(1); //找上游要一个数据

}

@Override //数据到达,触发这个回调

public void onNext(String item) {

System.out.println("processor拿到数据:"+item);

//再加工

item += ":哈哈";

submit(item);//把我加工后的数据发出去

subscription.request(1); //再要新数据

}

@Override

public void onError(Throwable throwable) {

}

@Override

public void onComplete() {

}

}

/**

* 1、Publisher:发布者

* 2、Subscriber:订阅者

* 3、Subscription: 订阅关系

* 4、Processor: 处理器

* @param args

*/

//发布订阅模型:观察者模式,

public static void main(String[] args) throws InterruptedException {

//1、定义一个发布者; 发布数据;

SubmissionPublisher publisher = new SubmissionPublisher<>();

//2、定一个中间操作: 给每个元素加个 哈哈 前缀

MyProcessor myProcessor1 = new MyProcessor();

MyProcessor myProcessor2 = new MyProcessor();

MyProcessor myProcessor3 = new MyProcessor();

//3、定义一个订阅者; 订阅者感兴趣发布者的数据;

Flow.Subscriber subscriber = new Flow.Subscriber() {

private Flow.Subscription subscription;

@Override //在订阅时 onXxxx:在xxx事件发生时,执行这个回调

public void onSubscribe(Flow.Subscription subscription) {

System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);

this.subscription = subscription;

//从上游请求一个数据

subscription.request(1);

}

@Override //在下一个元素到达时; 执行这个回调; 接受到新数据

public void onNext(String item) {

System.out.println(Thread.currentThread()+"订阅者,接受到数据:"+item);

if(item.equals("p-7")){

subscription.cancel(); //取消订阅

}else {

subscription