WebFlux概述
简介
- WebFlux是Spring5新添加的模块以用于web开发,功能和SpringMVC类似。
- Webflux使用响应式编程的框架。
- Webflux 是一种异步非阻塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于Reactor的相关 API 实现的。
异步非阻塞
异步和同步针对调用者,调用者发送请求,如果等着对方回应之后才去做其他事情就是同步,如果发送请求之后不等着对方回应就去做其他事情就是异步。
阻塞和非阻塞针对被调用者,被调用者收到请求之后,做完请求任务之后才给出反馈就是阻塞,收到请求之后马上给出反馈然后再去做事情就是非阻塞。
响应式编程
简介
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便
地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。比如Excel表格中的公式,公式的值会随着单元格的变化而变化。
Java8提供观察者模式两个类Observer和Observable,Java9提供Flow类实现响应式编程。
Reactor实现
简介
- Reactor是满足Reactive规范的框架
- Reactor有Mono和Flux两个核心类,这两个类都实现了Publisher接口,提供丰富操作符。Flux对象实现发布者,返回 N 个元素;Mono实现发布者,返回 0 或者1个元素
- Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号
错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了。错误信号终止数据流同时把错误信息传递给订阅者
三种信号特点
- 错误信号和完成信号都是终止信号,不能共存的
- 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
- 如果没有错误信号,没有完成信号,表示是无限数据流
操作符
对数据流进行一道道操作成为操作符,比如工厂流水线。
- map元素映射为新元素
比如上图对1、2和3进行了平方操作然后输出
- flatMap元素映射为流
把每个元素转换流,把转换之后多个流合并大的流
示例代码
1 2 3 4 5 6
| # 引入依赖 <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.1.5.RELEASE</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void main(String[] args) {
Flux.just(1,2,3,4).subscribe(System.out::println); Mono.just(1).subscribe(System.out::print); Integer[] array = {1,2,3,4}; Flux.fromArray(array); List<Integer> list = Arrays.asList(array); Flux.fromIterable(list); Stream<Integer> stream = list.stream(); Flux.fromStream(stream); }
|
SpringWebflux执行流程和核心API
SpringWebflux基于 Reactor,默认使用容器是 Netty,Netty是高性能的 NIO(同步非阻
塞) 框架。
执行流程
SpringWebflux核心控制器DispatchHandler,实现接口WebHandler,接口中有个handler方法:
1 2 3
| public interface WebHandler { Mono<Void> handle(ServerWebExchange var1); }
|
1 2 3 4 5 6 7 8 9
| public Mono<Void> handle(ServerWebExchange exchange) { return this.handlerMappings == null ? this.createNotFoundError() : Flux.fromIterable(this.handlerMappings).concatMap((mapping) -> { return mapping.getHandler(exchange); }).next().switchIfEmpty(this.createNotFoundError()).flatMap((handler) -> { return this.invokeHandler(exchange, handler); }).flatMap((result) -> { return this.handleResult(exchange, result); }); }
|
核心API
SpringWebflux里面DispatcherHandler类负责请求的处理,具体如下:
- HandlerMapping
请求查询到处理的方法
- HandlerAdapter
真正负责请求处理
- HandlerResultHandler
响应结果处理
SpringWebflux 实现函数式编程,两个接口:
- RouterFunction
路由处理
- HandlerFunction
处理函数
SpringWebflux基于注解编程
SpringWebflux 使用注解编程模型方式,和之前 SpringMVC 使用相似的,只需要把相关依赖配置到项目中,SpringBoot自动配置相关运行容器,默认情况下使用 Netty服务器。SpringMVC和SpringWebflux对比如下:
- SpringMVC 方式实现,同步阻塞的方式,基于SpringMVC+Servlet+Tomcat
- SpringWebflux 方式实现,异步非阻塞 方式,基于SpringWebflux+Reactor+Netty
示例代码如下:
这是SpringBoot+Maven项目,需要引入依赖:
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
|
项目结构图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| │ ├─src │ ├─main │ │ ├─java │ │ │ └─com │ │ │ └─webflux │ │ │ └─demo │ │ │ │ WebfluxdemoApplication.java │ │ │ │ │ │ │ ├─controller │ │ │ │ UserController.java │ │ │ │ │ │ │ ├─entity │ │ │ │ User.java │ │ │ │ │ │ │ └─service │ │ │ │ UserService.java │ │ │ │ │ │ │ └─impl │ │ │ UserServiceImpl.java │ │ │ │ │ └─resources │ │ application.properties
|
User类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class User { private String name; private String gender; private Integer age;
public User(String name, String gender, Integer age) { this.name = name; this.gender = gender; this.age = age; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getGender() { return gender; }
public void setGender(String gender) { this.gender = gender; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; } }
|
UserService接口:
1 2 3 4 5 6 7 8
| public interface UserService { Mono<User> getUserById(int id); Flux<User> getAllUser(); Mono<Void> saveUserInfo(Mono<User> user); }
|
UserServiceImpl类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Service public class UserServiceImpl implements UserService {
private final Map<Integer,User> users = new HashMap<>(); public UserServiceImpl() { this.users.put(1,new User("lucy","female",20)); this.users.put(2,new User("mary","female",25)); this.users.put(3,new User("jack","male",30)); }
@Override public Mono<User> getUserById(int id) { return Mono.justOrEmpty(this.users.get(id)); }
@Override public Flux<User> getAllUser() { return Flux.fromIterable(this.users.values()); }
@Override public Mono<Void> saveUserInfo(Mono<User> userMono) { return userMono.doOnNext(person -> { int id = users.size()+1; users.put(id,person); }).thenEmpty(Mono.empty()); } }
|
UserController类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @RestController public class UserController {
@Autowired private UserService userService;
@GetMapping("/user/{id}") public Mono<User> geetUserId(@PathVariable int id) { return userService.getUserById(id); }
@GetMapping("/getusers") public Flux<User> getUsers() { return userService.getAllUser(); }
@PostMapping("/saveuser") public Mono<Void> saveUser(@RequestBody User user) { Mono<User> userMono = Mono.just(user); return userService.saveUserInfo(userMono); } }
|
WebfluxdemoApplication类:
1 2 3 4 5 6 7
| @SpringBootApplication public class WebfluxdemoApplication {
public static void main(String[] args) { SpringApplication.run(WebfluxdemoApplication.class,args); } }
|
application.properties:
运行结果:
SpringWebflux基于函数式编程
在使用函数式编程模型操作时候,需要自己初始化服务器。基于函数式编程模型时候,有两个核心接口:RouterFunction(实现路由功能,请求转发给对应的 handler)和 HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数式接口的实现并且启动需要的服务器。SpringWebflux 请 求 和 响 应 不 再 是 ServletRequest和ServletResponse ,而是ServerRequest 和 ServerResponse。
具体步骤
- 创建Handler(在这里写具体实现方法)
- 创建Router路由,并初始化服务器做适配
- 创建Router路由
- 创建服务器做适配
- 最终调用方法(main)编写
示例代码
项目结构图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| ├─src │ ├─main │ │ ├─java │ │ │ └─com │ │ │ └─webflux │ │ │ └─demo │ │ │ │ Server.java │ │ │ │ │ │ │ ├─entity │ │ │ │ User.java │ │ │ │ │ │ │ ├─handler │ │ │ │ UserHandler.java │ │ │ │ │ │ │ └─service │ │ │ │ UserService.java │ │ │ │ │ │ │ └─impl │ │ │ UserServiceImpl.java
|
entity和service代码不变,与基于注解的代码一样
UserHandler类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class UserHandler { private final UserService userService;
public UserHandler(UserService userService) { this.userService = userService; }
public Mono<ServerResponse> getUserById(ServerRequest request) { int userId = Integer.valueOf(request.pathVariable("id")); Mono<ServerResponse> notFound = ServerResponse.notFound().build(); Mono<User> userMono = this.userService.getUserById(userId); return userMono.flatMap(person -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(person))) .switchIfEmpty(notFound); }
public Mono<ServerResponse> getAllUsers(ServerRequest request) { Flux<User> users = this.userService.getAllUser(); return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users, User.class); }
public Mono<ServerResponse> saveUser(ServerRequest request) { Mono<User> userMono = request.bodyToMono(User.class); return ServerResponse.ok().build(this.userService.saveUserInfo(userMono)); } }
|
Server类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class Server {
public RouterFunction<ServerResponse> routingFunction() { UserService userService = new UserServiceImpl(); UserHandler handler = new UserHandler(userService); return RouterFunctions.route(GET("/users/{id}").and(accept(APPLICATION_JSON)),handler::getUserById) .andRoute(GET("/getallusers").and(accept(APPLICATION_JSON)),handler::getAllUsers); }
public void createReactorServer() { RouterFunction<ServerResponse> route = routingFunction(); HttpHandler httpHandler = toHttpHandler(route); ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler); HttpServer httpServer = HttpServer.create(); httpServer.handle(adapter).bindNow(); }
public static void main(String[] args) throws Exception{ Server server = new Server(); server.createReactorServer(); System.out.println("enter to exit"); System.in.read(); } }
|
运行main方法后效果:
1 2 3 4 5
| 16:05:18.184 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled 16:05:18.184 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0 16:05:18.184 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384 16:05:18.204 [reactor-http-nio-1] DEBUG reactor.netty.tcp.TcpServer - [id: 0x16e3bd47, L:/0:0:0:0:0:0:0:0:51646] Bound new server enter to exit
|
运行成功会有一个端口,比如我这里是51646,然后在浏览器中用这个端口请求就可以了。如下所示:
如有错误,欢迎指正!