Spring5响应式WEB编程-Webflux示例

Spring WebFlux简介

Spring WebFlux是随Spring 5推出的响应式Web框架:



(左侧为基于spring-webmvc的技术栈,右侧为基于spring-webflux的技术栈)

服务端技术栈

  • Spring WebFlux是基于响应式流的,因此可以用来建立异步的、非阻塞的、事件驱动的服务。它采用Reactor作为首选的响应式流的实现库,不过也提供了对RxJava的支持;
  • 由于响应式编程的特性,Spring WebFlux和Reactor底层需要支持异步的运行环境,比如NettyUndertow;也可以运行在支持异步I/O的Servlet 3.1的容器之上,比如Tomcat(8.0.23及以上)和Jetty(9.0.4及以上);
  • 从图的纵向上看,spring-webflux上层支持两种开发模式:
    • 类似于Spring WebMVC的基于注解(@Controller@RequestMapping)的开发模式;
    • Java 8 lambda 风格的函数式开发模式;
  • 响应式的WebSocket服务端开发;

    客户端技术栈

此外,Spring WebFlux也提供了一个响应式的Http客户端API WebClient。它可以用函数式的方式异步非阻塞地发起Http请求并处理响应。

WebClient可以看做是响应式的RestTemplate,相对于后者来说,他的特点是:

  • 是非阻塞的,可以基于少量的线程处理更高的并发;
  • 可以使用Java 8 lambda表达式;
  • 支持异步的同时也可以支持同步的使用方式;
  • 可以通过数据流的方式与服务端进行双向通信;
  • 响应式的WebSocket客户端API开发;

    通过Spring Boot实战WebFlux

本文的例子很简单,先直接使用Spring Initializr构建一个简单的SpringBoot项目。

核心依赖

对应的完整POM文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.wuwenze</groupId>
<artifactId>spring-webflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-webflux</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

Hello WebFlux

从语法上来看,与传统的SpringMVC看起来还是相差无几,启动应用后,发现应用启动于Netty之上。

WebFlux提供了与之前WebMVC相同的一套注解来定义请求的处理,使得Spring使用者迁移到响应式开发方式的过程变得异常轻松。

整个技术栈从命令式的、同步阻塞的【Spring-WebMVC + Servlet + Tomcat】变成了响应式的、异步非阻塞的【Spring-WebFlux + Reactor + Netty】。

WebFlux 函数式开发模式

这里还是先来个简单的例子吧,后续再详细讲解:

UserService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.wuwenze.springwebflux.service;

import com.google.common.collect.Maps;
import com.wuwenze.springwebflux.entity.User;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Map;

@Service
public class UserService {
// key, user
private final Map<String, User> _userMock = Maps.newConcurrentMap();

{
_userMock.put("key1", User.builder().id(1).name("user1").build());
_userMock.put("key2", User.builder().id(2).name("user2").build());
_userMock.put("key3", User.builder().id(3).name("user3").build());
_userMock.put("key4", User.builder().id(4).name("user4").build());
_userMock.put("key5", User.builder().id(5).name("user5").build());
}

public Flux<User> list() {
return Flux.fromIterable(this._userMock.values());
}

public Flux<User> findByKeys(Flux<String> keys) {
return keys.flatMap((key) -> Mono.justOrEmpty(this._userMock.get(key)));
}

public Mono<User> getByKey(String key) {
return Mono.justOrEmpty(this._userMock.get(key))
.switchIfEmpty(Mono.error(new RuntimeException("#key = " + key + "不存在")));
}

public Mono<User> saveOrUpdate(User user) {
String key = "key" + user.getId();
return this.save(key, user)
.onErrorResume((e) -> {
// 如果存在,说明数据库存在记录,查找并修改
return this.getByKey(key)
.flatMap((originalUser) -> {
originalUser.setName(user.getName());
return this.update(key, originalUser);
});
});
}

public Flux<User> saveOrUpdateBatch(Flux<User> users) {
return users.doOnNext((user) -> this.saveOrUpdate(user));
}

public Mono<User> remove(String key) {
return Mono.justOrEmpty(this._userMock.remove(key));
}

// 模拟数据库新增
private Mono<User> save(String key, User user) {
if (this._userMock.containsKey(key)) {
return Mono.error(new RuntimeException());
}
this._userMock.put(key, user);
return Mono.just(user);
}

// 模拟数据库修改
private Mono<User> update(String key, User user) {
this._userMock.put(key, user);
return Mono.just(user);
}
}

UserController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.wuwenze.springwebflux.rest;

import com.wuwenze.springwebflux.entity.User;
import com.wuwenze.springwebflux.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@RestController
public class UserController {

@Autowired
public UserService userService;

@GetMapping("/list")
public Flux<User> list() {
return this.userService.list();
}

// 流式响应,数据每延迟2秒,一批一批到达客户端
@GetMapping(value = "/list_stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<User> list_stream() {
return this.userService.list()
.delayElements(Duration.ofSeconds(2));
}

@GetMapping("/get/{key}")
public Mono<User> get(@PathVariable String key) {
return this.userService.getByKey(key);
}

@PostMapping("/save")
public Mono<User> save(@RequestBody User user) {
return this.userService.saveOrUpdate(user);
}

@DeleteMapping("/remove/{key}")
public Mono<User> remove(@PathVariable String key) {
return this.userService.remove(key);
}
}

上图为/list_stream(流式响应)的测试效果图,其余接口就不贴图了。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×