springboot 使用webflux响应式开发教程(一)

11 篇文章 0 订阅
1 篇文章 0 订阅

什么是webFlux

image
左侧是传统的基于Servlet的Spring Web MVC框架,右侧是5.0版本新引入的基于Reactive Streams的Spring WebFlux框架,从上到下依次是Router FunctionsWebFluxReactive Streams三个新组件。

  • Router Functions: 对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。
  • WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。
  • Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor

在Web容器的选择上,Spring WebFlux既支持像Tomcat,Jetty这样的的传统容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那样的异步容器。不管是何种容器,Spring WebFlux都会将其输入输出流适配成Flux<DataBuffer>格式,以便进行统一处理。

值得一提的是,除了新的Router Functions接口,Spring WebFlux同时支持使用老的Spring MVC注解声明Reactive Controller。和传统的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequestServerHttpResponse,而不再是Spring MVC里的HttpServletRequestHttpServletResponse

  1. @GetMapping("/reactive/restaurants")
  2. public Flux<Restaurant> findAll() {
  3. return restaurantRepository.findAll();
  4. }

可以看到主要变化就是在 返回的类型上Flux<Restaurant>

Flux和Mono 是 Reactor 中的流数据类型,其中Flux会发送多次,Mono会发送0次或一次

使用webflux需要具备的基础是Reactive programming 的理解。 
Reactor 的基础 和 熟练的java8 lambda使用

创建springboot应用

下面通过创建股票报价的demo来演示。

通过 https://start.spring.io 或idea自带功能创建springboot项目,groupId为io.spring.workshop,artifactId为 stock-quotes。

勾选 ReactiveWeb

 

修改 application.properties 配置文件,指定接口 8081

 

server.port=8081

启动应用,成功后控制台输出日志

日志显示使用Netty而不是tomcat,后续会使用Tomcat

股票报价生成

 

 

定义实体

  1. @Data
  2. public class Quote {
  3. private static final MathContext MATH_CONTEXT = new MathContext(2);
  4. private String ticker;
  5. private BigDecimal price;
  6. private Instant instant;
  7. public Quote() {
  8. }
  9. public Quote(String ticker, BigDecimal price) {
  10. this.ticker = ticker;
  11. this.price = price;
  12. }
  13. public Quote(String ticker, Double price) {
  14. this(ticker, new BigDecimal(price, MATH_CONTEXT));
  15. }
  16. @Override
  17. public String toString() {
  18. return "Quote{" +
  19. "ticker='" + ticker + '\'' +
  20. ", price=" + price +
  21. ", instant=" + instant +
  22. '}';
  23. }
  24. }

定义生成器

 

  1. @Component
  2. public class QuoteGenerator {
  3. private final MathContext mathContext = new MathContext(2);
  4. private final Random random = new Random();
  5. private final List<Quote> prices = new ArrayList<>();
  6. /**
  7. * 生成行情数据
  8. */
  9. public QuoteGenerator() {
  10. this.prices.add(new Quote("CTXS", 82.26));
  11. this.prices.add(new Quote("DELL", 63.74));
  12. this.prices.add(new Quote("GOOG", 847.24));
  13. this.prices.add(new Quote("MSFT", 65.11));
  14. this.prices.add(new Quote("ORCL", 45.71));
  15. this.prices.add(new Quote("RHT", 84.29));
  16. this.prices.add(new Quote("VMW", 92.21));
  17. }
  18. public Flux<Quote> fetchQuoteStream(Duration period) {
  19. // 需要周期生成值并返回,使用 Flux.interval
  20. return Flux.interval(period)
  21. // In case of back-pressure, drop events
  22. .onBackpressureDrop()
  23. // For each tick, generate a list of quotes
  24. .map(this::generateQuotes)
  25. // "flatten" that List<Quote> into a Flux<Quote>
  26. .flatMapIterable(quotes -> quotes)
  27. .log("io.spring.workshop.stockquotes");
  28. }
  29. /**
  30. * Create quotes for all tickers at a single instant.
  31. */
  32. private List<Quote> generateQuotes(long interval) {
  33. final Instant instant = Instant.now();
  34. return prices.stream()
  35. .map(baseQuote -> {
  36. BigDecimal priceChange = baseQuote.getPrice()
  37. .multiply(new BigDecimal(0.05 * this.random.nextDouble()), this.mathContext);
  38. Quote result = new Quote(baseQuote.getTicker(), baseQuote.getPrice().add(priceChange));
  39. result.setInstant(instant);
  40. return result;
  41. })
  42. .collect(Collectors.toList());
  43. }
  44. }

使用webflux创建web应用

webflux的使用有两种方式,基于注解和函数式编程。这里使用函数式编程,先贴代码:

创建QuoteHandler

 

  1. @Component
  2. public class QuoteHandler {
  3. private final Flux<Quote> quoteStream;
  4. public QuoteHandler(QuoteGenerator quoteGenerator) {
  5. this.quoteStream = quoteGenerator.fetchQuoteStream(ofMillis(1000)).share();
  6. }
  7. public Mono<ServerResponse> hello(ServerRequest request) {
  8. return ok().contentType(TEXT_PLAIN)
  9. .body(BodyInserters.fromObject("Hello Spring!"));
  10. }
  11. public Mono<ServerResponse> echo(ServerRequest request) {
  12. return ok().contentType(TEXT_PLAIN)
  13. .body(request.bodyToMono(String.class), String.class);
  14. }
  15. public Mono<ServerResponse> streamQuotes(ServerRequest request) {
  16. return ok()
  17. .contentType(APPLICATION_STREAM_JSON)
  18. .body(this.quoteStream, Quote.class);
  19. }
  20. public Mono<ServerResponse> fetchQuotes(ServerRequest request) {
  21. int size = Integer.parseInt(request.queryParam("size").orElse("10"));
  22. return ok()
  23. .contentType(APPLICATION_JSON)
  24. .body(this.quoteStream.take(size), Quote.class);
  25. }
  26. }

创建Router

 

  1. @Configuration
  2. public class QuoteRouter {
  3. @Bean
  4. public RouterFunction<ServerResponse> route(QuoteHandler quoteHandler) {
  5. return RouterFunctions
  6. .route(GET("/hello").and(accept(TEXT_PLAIN)), quoteHandler::hello)
  7. .andRoute(POST("/echo").and(accept(TEXT_PLAIN).and(contentType(TEXT_PLAIN))), quoteHandler::echo)
  8. .andRoute(GET("/quotes").and(accept(APPLICATION_JSON)), quoteHandler::fetchQuotes)
  9. .andRoute(GET("/quotes").and(accept(APPLICATION_STREAM_JSON)), quoteHandler::streamQuotes);
  10. }
  11. }

需要注意的是在springboot中Handler和Router都需要打上@Configuration。

HTTP请求交由Router转发给对应的Handler,Handler处理请求,并返回Mono<ServerResponse>,这里的Router类似@RequestMapping,Handler类似Controller。这么理解非常容易。

运行项目,浏览器输入 http://localhost:8081/hello 或者 使用curl,即可收到 "Hello Spring!"的文本信息。

到目前为止,一个简单的webflux示例已经完成,但是还没有体现出它与传统模式有何不同。

下面我们来做一下测试:

 

  1. $ curl http://localhost:8081/echo -i -d "WebFlux workshop" -H "Content-Type: text/plain"
  2. HTTP/1.1 200 OK
  3. transfer-encoding: chunked
  4. Content-Type: text/plain
  5. WebFlux workshop

还是没有区别T.T,看下一步。

 

  1. $ curl http://localhost:8081/quotes -i -H "Accept: application/stream+json"
  2. HTTP/1.1 200 OK
  3. transfer-encoding: chunked
  4. Content-Type: application/stream+json
  5. {"ticker":"CTXS","price":82.77,"instant":"2018-05-15T06:45:51.261Z"}
  6. {"ticker":"DELL","price":64.83,"instant":"2018-05-15T06:45:51.261Z"}
  7. {"ticker":"GOOG","price":881,"instant":"2018-05-15T06:45:51.261Z"}
  8. {"ticker":"MSFT","price":67.3,"instant":"2018-05-15T06:45:51.261Z"}
  9. {"ticker":"ORCL","price":48.1,"instant":"2018-05-15T06:45:51.261Z"}
  10. {"ticker":"RHT","price":85.1,"instant":"2018-05-15T06:45:51.261Z"}
  11. {"ticker":"VMW","price":92.24,"instant":"2018-05-15T06:45:51.261Z"}
  1. -------------------------------无敌分割线-------------------------------------
  2. {"ticker":"CTXS","price":85.7,"instant":"2018-05-15T06:45:52.260Z"}
  3. {"ticker":"DELL","price":64.12,"instant":"2018-05-15T06:45:52.260Z"}
  4. {"ticker":"GOOG","price":879,"instant":"2018-05-15T06:45:52.260Z"}
  5. {"ticker":"MSFT","price":67.9,"instant":"2018-05-15T06:45:52.260Z"}
  6. {"ticker":"ORCL","price":46.43,"instant":"2018-05-15T06:45:52.260Z"}
  7. {"ticker":"RHT","price":86.8,"instant":"2018-05-15T06:45:52.260Z"}
...

 

上面的分割线是为了易于分辨人为加上去的,我们看到返回结果每隔一秒刷新一次,不终止的话会一直返回数据,传统的Request/Response是一次请求,一次返回。

注意是设置了Header Accept: application/stream+json ,

如果将Header设置为 Accept: application/json ,只会得到一次Response。

写测试

springboot的test模块包含WebTestClient,可以用来对webflux服务端进行测试。

 

  1. @RunWith(SpringRunner.class)
  2. // We create a `@SpringBootTest`, starting an actual server on a `RANDOM_PORT`
  3. @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  4. public class StockQuotesApplicationTests {
  5. // Spring Boot will create a `WebTestClient` for you,
  6. // already configure and ready to issue requests against "localhost:RANDOM_PORT"
  7. @Autowired
  8. private WebTestClient webTestClient;
  9. @Test
  10. public void fetchQuotes() {
  11. webTestClient
  12. // We then create a GET request to test an endpoint
  13. .get().uri("/quotes?size=20")
  14. .accept(MediaType.APPLICATION_JSON)
  15. .exchange()
  16. // and use the dedicated DSL to test assertions against the response
  17. .expectStatus().isOk()
  18. .expectHeader().contentType(MediaType.APPLICATION_JSON)
  19. .expectBodyList(Quote.class)
  20. .hasSize(20)
  21. // Here we check that all Quotes have a positive price value
  22. .consumeWith(allQuotes ->
  23. assertThat(allQuotes.getResponseBody())
  24. .allSatisfy(quote -> assertThat(quote.getPrice()).isPositive()));
  25. }
  26. @Test
  27. public void fetchQuotesAsStream() {
  28. List<Quote> result = webTestClient
  29. // We then create a GET request to test an endpoint
  30. .get().uri("/quotes")
  31. // this time, accepting "application/stream+json"
  32. .accept(MediaType.APPLICATION_STREAM_JSON)
  33. .exchange()
  34. // and use the dedicated DSL to test assertions against the response
  35. .expectStatus().isOk()
  36. .expectHeader().contentType(MediaType.APPLICATION_STREAM_JSON)
  37. .returnResult(Quote.class)
  38. .getResponseBody()
  39. .take(30)
  40. .collectList()
  41. .block();
  42. assertThat(result).allSatisfy(quote -> assertThat(quote.getPrice()).isPositive());
  43. }
  44. }

后续期待更多精彩(二)

参考文章:

https://docs.spring.io/spring-framework/docs/5.0.3.RELEASE/spring-framework-reference/web.html#web-reactive-server-functional
http://projectreactor.io/docs
https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html
https://blog.csdn.net/qq_34438958/article/details/78539234