Spring5响应式WEB编程-Webflux示例

2018-12-17

Spring WebFlux简介

Spring WebFlux是随Spring 5推出的响应式Web框架:

image

(左侧为基于spring-webmvc的技术栈,右侧为基于spring-webflux的技术栈)

服务端技术栈

  • Spring WebFlux是基于响应式流的,因此可以用来建立异步的、非阻塞的、事件驱动的服务。它采用Reactor作为首选的响应式流的实现库,不过也提供了对RxJava的支持;
  • 由于响应式编程的特性,Spring WebFlux和Reactor底层需要支持异步的运行环境,比如NettyUndertow;也可以运行在支持异步I/O的Servlet 3.1的容器之上,比如Tomcat(8.0.23及以上)和Jetty(9.0.4及以上);
  • 从图的纵向上看,spring-webflux上层支持两种开发模式:
    • 类似于Spring WebMVC的基于注解(@Controller@RequestMapping)的开发模式;
    • Java 8 lambda 风格的函数式开发模式;
  • 响应式的WebSocket服务端开发;

客户端技术栈

此外,Spring WebFlux也提供了一个响应式的Http客户端API WebClient。它可以用函数式的方式异步非阻塞地发起Http请求并处理响应。

WebClient可以看做是响应式的RestTemplate,相对于后者来说,他的特点是:

  • 是非阻塞的,可以基于少量的线程处理更高的并发;
  • 可以使用Java 8 lambda表达式;
  • 支持异步的同时也可以支持同步的使用方式;
  • 可以通过数据流的方式与服务端进行双向通信;
  • 响应式的WebSocket客户端API开发;

通过Spring Boot实战WebFlux

本文的例子很简单,先直接使用Spring Initializr构建一个简单的SpringBoot项目。

核心依赖

image

对应的完整POM文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.wuwenze</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-webflux</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Hello WebFlux

image

从语法上来看,与传统的SpringMVC看起来还是相差无几,启动应用后,发现应用启动于Netty之上。

WebFlux提供了与之前WebMVC相同的一套注解来定义请求的处理,使得Spring使用者迁移到响应式开发方式的过程变得异常轻松。

整个技术栈从命令式的、同步阻塞的【Spring-WebMVC + Servlet + Tomcat】变成了响应式的、异步非阻塞的【Spring-WebFlux + Reactor + Netty】。

image

WebFlux 函数式开发模式

这里还是先来个简单的例子吧,后续再详细讲解:

UserService

package com.wuwenze.springwebflux.service;

import com.google.common.collect.Maps;
import com.wuwenze.springwebflux.entity.User;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Map;

@Service
public class UserService {
    // key, user
    private final Map<String, User> _userMock = Maps.newConcurrentMap();

    {
        _userMock.put("key1", User.builder().id(1).name("user1").build());
        _userMock.put("key2", User.builder().id(2).name("user2").build());
        _userMock.put("key3", User.builder().id(3).name("user3").build());
        _userMock.put("key4", User.builder().id(4).name("user4").build());
        _userMock.put("key5", User.builder().id(5).name("user5").build());
    }

    public Flux<User> list() {
        return Flux.fromIterable(this._userMock.values());
    }

    public Flux<User> findByKeys(Flux<String> keys) {
        return keys.flatMap((key) -> Mono.justOrEmpty(this._userMock.get(key)));
    }

    public Mono<User> getByKey(String key) {
        return Mono.justOrEmpty(this._userMock.get(key))
                .switchIfEmpty(Mono.error(new RuntimeException("#key = " + key + "不存在")));
    }

    public Mono<User> saveOrUpdate(User user) {
        String key = "key" + user.getId();
        return this.save(key, user)
                .onErrorResume((e) -> {
                    // 如果存在,说明数据库存在记录,查找并修改
                    return this.getByKey(key)
                            .flatMap((originalUser) -> {
                                originalUser.setName(user.getName());
                                return this.update(key, originalUser);
                            });
                });
    }

    public Flux<User> saveOrUpdateBatch(Flux<User> users) {
        return users.doOnNext((user) -> this.saveOrUpdate(user));
    }

    public Mono<User> remove(String key) {
        return Mono.justOrEmpty(this._userMock.remove(key));
    }

    // 模拟数据库新增
    private Mono<User> save(String key, User user) {
        if (this._userMock.containsKey(key)) {
            return Mono.error(new RuntimeException());
        }
        this._userMock.put(key, user);
        return Mono.just(user);
    }

    // 模拟数据库修改
    private Mono<User> update(String key, User user) {
        this._userMock.put(key, user);
        return Mono.just(user);
    }
}

UserController

package com.wuwenze.springwebflux.rest;

import com.wuwenze.springwebflux.entity.User;
import com.wuwenze.springwebflux.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@RestController
public class UserController {

    @Autowired
    public UserService userService;

    @GetMapping("/list")
    public Flux<User> list() {
        return this.userService.list();
    }

    // 流式响应,数据每延迟2秒,一批一批到达客户端
    @GetMapping(value = "/list_stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<User> list_stream() {
        return this.userService.list()
                .delayElements(Duration.ofSeconds(2));
    }

    @GetMapping("/get/{key}")
    public Mono<User> get(@PathVariable String key) {
        return this.userService.getByKey(key);
    }

    @PostMapping("/save")
    public Mono<User> save(@RequestBody User user) {
        return this.userService.saveOrUpdate(user);
    }

    @DeleteMapping("/remove/{key}")
    public Mono<User> remove(@PathVariable String key) {
        return this.userService.remove(key);
    }
}

image

上图为/list_stream(流式响应)的测试效果图,其余接口就不贴图了。