《响应式编程》超详细教程
【第一章】规范
Reactive规范
Reactive Streams 是JVM面向流的库的标准和规范 1、处理可能无限数量的元素 2、有序 3、在组件之间异步传递元素 4、强制性非阻塞,背压模式 什么是正压?什么是被压? 正压就是请求全部一次性打过来,被压就是放在缓冲队列里,按照能力来处理。
线程越多越好吗?
【第二章】核心概念
基于异步、消息驱动的全事件回调系统:响应式系统
API Components: 1.Publisher: 发布者;产生数据流 2.Subscriber: 订阅者;消费数据流 3.Subscription: 订阅关系; a.订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。 4.Processor: 处理器; a.处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在 Reactor 中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。 这种模型遵循 Reactive Streams 规范,确保了异步流的一致性和可靠性。
1、Mono和Flux
Mono: 0|1个元素的流 Flux: N个元素的流; N > 1 发布者发布数据流:源头
2、subscribe()
说明:没订阅前什么也不会发生。
⾃定义流的信号感知回调
flux.subscribe(
v-> System.out.println("v = " + v), //流元素消费
throwable -> System.out.println("throwable = " + throwable), //感知异常结束
()-> System.out.println("流结束了...") //感知正常结束
);
⾃定义消费者
flux.subscribe(new BaseSubscriber
// 生命周期钩子1:订阅关系绑定的时候触发
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 流被订阅的时候触发
System.out.println("绑定了...." + subscription);
// 找发布者要数据
request(1); // 要1个数据
requestUnbounded(); // 要无限数据
}
@Override
protected void hookOnNext(String value) {
System.out.println("数据到达,正在处理: " + value);
request(1); // 要1个数据
}
// hookOnComplete、hookOnError 二选一执行
@Override
protected void hookOnComplete() {
System.out.println("流正常结束....");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("流异常...." + throwable);
}
@Override
protected void hookOnCancel() {
System.out.println("流被取消....");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("最终回调....一定会被执行");
}
});
3、流的取消
消费者调⽤ cancle() 取消流的订阅;
4、BaseSubscriber
自定义消费者,推荐直接编写 BaseSubscriber 的逻辑;
【第三章】代码实例
创建流项目
1、简单测试Flux
流不消费是不会运行的,懒惰性。
public void flux() throws IOException {
Flux
//流不消费就没用; 消费:订阅
just.subscribe(e -> System.out.println("e1 = " + e));
//一个数据流可以有很多消费者
just.subscribe(e -> System.out.println("e2 = " + e));
//对于每个消费者来说流都是一样的; 广播模式;
System.out.println("==========");
Flux
flux.subscribe(System.out::println);
System.in.read();//保证主线程不关闭
}
2、自定义流程处理器
1个发布者,
3个流程处理器
1个订阅者
package com.atguigu.flow;
import lombok.SneakyThrows;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo {
//定义流中间操作处理器; 只用写订阅者的接口
static class MyProcessor extends SubmissionPublisher
private Flow.Subscription subscription; //保存绑定关系
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("processor订阅绑定完成");
this.subscription = subscription;
subscription.request(1); //找上游要一个数据
}
@Override //数据到达,触发这个回调
public void onNext(String item) {
System.out.println("processor拿到数据:"+item);
//再加工
item += ":哈哈";
submit(item);//把我加工后的数据发出去
subscription.request(1); //再要新数据
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
/**
* 1、Publisher:发布者
* 2、Subscriber:订阅者
* 3、Subscription: 订阅关系
* 4、Processor: 处理器
* @param args
*/
//发布订阅模型:观察者模式,
public static void main(String[] args) throws InterruptedException {
//1、定义一个发布者; 发布数据;
SubmissionPublisher
//2、定一个中间操作: 给每个元素加个 哈哈 前缀
MyProcessor myProcessor1 = new MyProcessor();
MyProcessor myProcessor2 = new MyProcessor();
MyProcessor myProcessor3 = new MyProcessor();
//3、定义一个订阅者; 订阅者感兴趣发布者的数据;
Flow.Subscriber
private Flow.Subscription subscription;
@Override //在订阅时 onXxxx:在xxx事件发生时,执行这个回调
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);
this.subscription = subscription;
//从上游请求一个数据
subscription.request(1);
}
@Override //在下一个元素到达时; 执行这个回调; 接受到新数据
public void onNext(String item) {
System.out.println(Thread.currentThread()+"订阅者,接受到数据:"+item);
if(item.equals("p-7")){
subscription.cancel(); //取消订阅
}else {
subscription