什么是webFlux
左侧是传统的基于Servlet的Spring Web MVC框架,右侧是5.0版本新引入的基于Reactive Streams的Spring WebFlux框架,从上到下依次是Router Functions,WebFlux,Reactive Streams三个新组件。
- Router Functions: 对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。
- WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。
- Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。
在Web容器的选择上,Spring WebFlux既支持像Tomcat,Jetty这样的的传统容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那样的异步容器。不管是何种容器,Spring WebFlux都会将其输入输出流适配成Flux<DataBuffer>
格式,以便进行统一处理。
值得一提的是,除了新的Router Functions接口,Spring WebFlux同时支持使用老的Spring MVC注解声明Reactive Controller。和传统的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest
和ServerHttpResponse
,而不再是Spring MVC里的HttpServletRequest
和HttpServletResponse
。
- @GetMapping("/reactive/restaurants")
- public Flux<Restaurant> findAll() {
- return restaurantRepository.findAll();
- }
可以看到主要变化就是在 返回的类型上Flux<Restaurant>
Flux和Mono 是 Reactor 中的流数据类型,其中Flux会发送多次,Mono会发送0次或一次
使用webflux需要具备的基础是Reactive programming 的理解。
Reactor 的基础 和 熟练的java8 lambda使用
创建springboot应用
下面通过创建股票报价的demo来演示。
通过 https://start.spring.io 或idea自带功能创建springboot项目,groupId为io.spring.workshop,artifactId为 stock-quotes。
勾选 ReactiveWeb
修改 application.properties 配置文件,指定接口 8081
server.port=8081
启动应用,成功后控制台输出日志
日志显示使用Netty而不是tomcat,后续会使用Tomcat
股票报价生成
定义实体
- @Data
- public class Quote {
-
- private static final MathContext MATH_CONTEXT = new MathContext(2);
-
- private String ticker;
-
- private BigDecimal price;
-
- private Instant instant;
-
- public Quote() {
- }
-
- public Quote(String ticker, BigDecimal price) {
- this.ticker = ticker;
- this.price = price;
- }
-
- public Quote(String ticker, Double price) {
- this(ticker, new BigDecimal(price, MATH_CONTEXT));
- }
-
- @Override
- public String toString() {
- return "Quote{" +
- "ticker='" + ticker + '\'' +
- ", price=" + price +
- ", instant=" + instant +
- '}';
- }
- }
定义生成器
- @Component
- public class QuoteGenerator {
-
- private final MathContext mathContext = new MathContext(2);
-
- private final Random random = new Random();
-
- private final List<Quote> prices = new ArrayList<>();
-
- /**
- * 生成行情数据
- */
- public QuoteGenerator() {
- this.prices.add(new Quote("CTXS", 82.26));
- this.prices.add(new Quote("DELL", 63.74));
- this.prices.add(new Quote("GOOG", 847.24));
- this.prices.add(new Quote("MSFT", 65.11));
- this.prices.add(new Quote("ORCL", 45.71));
- this.prices.add(new Quote("RHT", 84.29));
- this.prices.add(new Quote("VMW", 92.21));
- }
-
-
- public Flux<Quote> fetchQuoteStream(Duration period) {
-
- // 需要周期生成值并返回,使用 Flux.interval
- return Flux.interval(period)
- // In case of back-pressure, drop events
- .onBackpressureDrop()
- // For each tick, generate a list of quotes
- .map(this::generateQuotes)
- // "flatten" that List<Quote> into a Flux<Quote>
- .flatMapIterable(quotes -> quotes)
- .log("io.spring.workshop.stockquotes");
- }
-
- /**
- * Create quotes for all tickers at a single instant.
- */
- private List<Quote> generateQuotes(long interval) {
- final Instant instant = Instant.now();
- return prices.stream()
- .map(baseQuote -> {
- BigDecimal priceChange = baseQuote.getPrice()
- .multiply(new BigDecimal(0.05 * this.random.nextDouble()), this.mathContext);
- Quote result = new Quote(baseQuote.getTicker(), baseQuote.getPrice().add(priceChange));
- result.setInstant(instant);
- return result;
- })
- .collect(Collectors.toList());
- }
- }
使用webflux创建web应用
webflux的使用有两种方式,基于注解和函数式编程。这里使用函数式编程,先贴代码:
创建QuoteHandler
- @Component
- public class QuoteHandler {
-
- private final Flux<Quote> quoteStream;
-
- public QuoteHandler(QuoteGenerator quoteGenerator) {
- this.quoteStream = quoteGenerator.fetchQuoteStream(ofMillis(1000)).share();
- }
-
- public Mono<ServerResponse> hello(ServerRequest request) {
- return ok().contentType(TEXT_PLAIN)
- .body(BodyInserters.fromObject("Hello Spring!"));
- }
-
- public Mono<ServerResponse> echo(ServerRequest request) {
- return ok().contentType(TEXT_PLAIN)
- .body(request.bodyToMono(String.class), String.class);
- }
-
- public Mono<ServerResponse> streamQuotes(ServerRequest request) {
- return ok()
- .contentType(APPLICATION_STREAM_JSON)
- .body(this.quoteStream, Quote.class);
- }
-
- public Mono<ServerResponse> fetchQuotes(ServerRequest request) {
- int size = Integer.parseInt(request.queryParam("size").orElse("10"));
- return ok()
- .contentType(APPLICATION_JSON)
- .body(this.quoteStream.take(size), Quote.class);
- }
- }
创建Router
- @Configuration
- public class QuoteRouter {
-
- @Bean
- public RouterFunction<ServerResponse> route(QuoteHandler quoteHandler) {
- return RouterFunctions
- .route(GET("/hello").and(accept(TEXT_PLAIN)), quoteHandler::hello)
- .andRoute(POST("/echo").and(accept(TEXT_PLAIN).and(contentType(TEXT_PLAIN))), quoteHandler::echo)
- .andRoute(GET("/quotes").and(accept(APPLICATION_JSON)), quoteHandler::fetchQuotes)
- .andRoute(GET("/quotes").and(accept(APPLICATION_STREAM_JSON)), quoteHandler::streamQuotes);
- }
- }
需要注意的是在springboot中Handler和Router都需要打上@Configuration。
HTTP请求交由Router转发给对应的Handler,Handler处理请求,并返回Mono<ServerResponse>,这里的Router类似@RequestMapping,Handler类似Controller。这么理解非常容易。
运行项目,浏览器输入 http://localhost:8081/hello 或者 使用curl,即可收到 "Hello Spring!"的文本信息。
到目前为止,一个简单的webflux示例已经完成,但是还没有体现出它与传统模式有何不同。
下面我们来做一下测试:
- $ curl http://localhost:8081/echo -i -d "WebFlux workshop" -H "Content-Type: text/plain"
- HTTP/1.1 200 OK
- transfer-encoding: chunked
- Content-Type: text/plain
-
- WebFlux workshop
还是没有区别T.T,看下一步。
- $ curl http://localhost:8081/quotes -i -H "Accept: application/stream+json"
- HTTP/1.1 200 OK
- transfer-encoding: chunked
- Content-Type: application/stream+json
-
- {"ticker":"CTXS","price":82.77,"instant":"2018-05-15T06:45:51.261Z"}
- {"ticker":"DELL","price":64.83,"instant":"2018-05-15T06:45:51.261Z"}
- {"ticker":"GOOG","price":881,"instant":"2018-05-15T06:45:51.261Z"}
- {"ticker":"MSFT","price":67.3,"instant":"2018-05-15T06:45:51.261Z"}
- {"ticker":"ORCL","price":48.1,"instant":"2018-05-15T06:45:51.261Z"}
- {"ticker":"RHT","price":85.1,"instant":"2018-05-15T06:45:51.261Z"}
- {"ticker":"VMW","price":92.24,"instant":"2018-05-15T06:45:51.261Z"}
- -------------------------------无敌分割线-------------------------------------
- {"ticker":"CTXS","price":85.7,"instant":"2018-05-15T06:45:52.260Z"}
- {"ticker":"DELL","price":64.12,"instant":"2018-05-15T06:45:52.260Z"}
- {"ticker":"GOOG","price":879,"instant":"2018-05-15T06:45:52.260Z"}
- {"ticker":"MSFT","price":67.9,"instant":"2018-05-15T06:45:52.260Z"}
- {"ticker":"ORCL","price":46.43,"instant":"2018-05-15T06:45:52.260Z"}
- {"ticker":"RHT","price":86.8,"instant":"2018-05-15T06:45:52.260Z"}
...
上面的分割线是为了易于分辨人为加上去的,我们看到返回结果每隔一秒刷新一次,不终止的话会一直返回数据,传统的Request/Response是一次请求,一次返回。
注意是设置了Header Accept: application/stream+json ,
如果将Header设置为 Accept: application/json ,只会得到一次Response。
写测试
springboot的test模块包含WebTestClient,可以用来对webflux服务端进行测试。
- @RunWith(SpringRunner.class)
- // We create a `@SpringBootTest`, starting an actual server on a `RANDOM_PORT`
- @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- public class StockQuotesApplicationTests {
-
- // Spring Boot will create a `WebTestClient` for you,
- // already configure and ready to issue requests against "localhost:RANDOM_PORT"
- @Autowired
- private WebTestClient webTestClient;
-
- @Test
- public void fetchQuotes() {
- webTestClient
- // We then create a GET request to test an endpoint
- .get().uri("/quotes?size=20")
- .accept(MediaType.APPLICATION_JSON)
- .exchange()
- // and use the dedicated DSL to test assertions against the response
- .expectStatus().isOk()
- .expectHeader().contentType(MediaType.APPLICATION_JSON)
- .expectBodyList(Quote.class)
- .hasSize(20)
- // Here we check that all Quotes have a positive price value
- .consumeWith(allQuotes ->
- assertThat(allQuotes.getResponseBody())
- .allSatisfy(quote -> assertThat(quote.getPrice()).isPositive()));
- }
-
- @Test
- public void fetchQuotesAsStream() {
- List<Quote> result = webTestClient
- // We then create a GET request to test an endpoint
- .get().uri("/quotes")
- // this time, accepting "application/stream+json"
- .accept(MediaType.APPLICATION_STREAM_JSON)
- .exchange()
- // and use the dedicated DSL to test assertions against the response
- .expectStatus().isOk()
- .expectHeader().contentType(MediaType.APPLICATION_STREAM_JSON)
- .returnResult(Quote.class)
- .getResponseBody()
- .take(30)
- .collectList()
- .block();
-
- assertThat(result).allSatisfy(quote -> assertThat(quote.getPrice()).isPositive());
- }
- }
后续期待更多精彩(二)
参考文章:
https://docs.spring.io/spring-framework/docs/5.0.3.RELEASE/spring-framework-reference/web.html#web-reactive-server-functional
http://projectreactor.io/docs
https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html
https://blog.csdn.net/qq_34438958/article/details/78539234