Spring 实战(第五版)
  • Spring 实战(第 5 版)
  • 第一部分 Spring 基础
  • 第 1 章 Spring 入门
    • 1.1 什么是 Spring?
    • 1.2 初始化 Spring 应用程序
      • 1.2.1 使用 Spring Tool Suite 初始化 Spring 项目
      • 1.2.2 检查 Spring 项目结构
    • 1.3 编写 Spring 应用程序
      • 1.3.1 处理 web 请求
      • 1.3.2 定义视图
      • 1.3.3 测试控制器
      • 1.3.4 构建并运行应用程序
      • 1.3.5 了解 Spring Boot DevTools
      • 1.3.6 回顾
    • 1.4 俯瞰 Spring 风景线
      • 1.4.1 Spring 核心框架
      • 1.4.2 Spring Boot
      • 1.4.3 Spring Data
      • 1.4.4 Spring Security
      • 1.4.5 Spring Integration 和 Spring Batch
      • 1.4.6 Spring Cloud
    • 1.5 小结
  • 第 2 章 开发 Web 应用程序
    • 2.1 展示信息
      • 2.1.1 建立域
      • 2.1.2 创建控制器类
      • 2.1.3 设计视图
    • 2.2 处理表单提交
    • 2.3 验证表单输入
      • 2.3.1 声明验证规则
      • 2.3.2 在表单绑定时执行验证
      • 2.3.3 显示验证错误
    • 2.4 使用视图控制器
    • 2.5 选择视图模板库
      • 2.5.1 缓存模板
    • 2.6 小结
  • 第 3 章 处理数据
    • 3.1 使用 JDBC 读写数据
      • 3.1.1 为域适配持久化
      • 3.1.2 使用 JdbcTemplate
      • 3.1.3 定义模式并预加载数据
      • 3.1.4 插入数据
    • 3.2 使用 Spring Data JPA 持久化数据
      • 3.2.1 添加 Spring Data JPA 到数据库中
      • 3.2.2 注解域作为实体
      • 3.2.3 声明 JPA repository
      • 3.2.4 自定义 JPA repository
    • 3.3 小结
  • 第 4 章 Spring 安全
    • 4.1 启用 Spring Security
    • 4.2 配置 Spring Security
      • 4.2.1 内存用户存储
      • 4.2.2 基于 JDBC 的用户存储
      • 4.2.3 LDAP 支持的用户存储
      • 4.2.4 自定义用户身份验证
    • 4.3 保护 web 请求
      • 4.3.1 保护请求
      • 4.3.2 创建用户登录页面
      • 4.3.3 登出
      • 4.3.4 阻止跨站请求伪造攻击
    • 4.4 了解你的用户
    • 4.5 小结
  • 第 5 章 使用配置属性
    • 5.1 微调自动配置
      • 5.1.1 理解 Spring 环境抽象
      • 5.1.2 配置数据源
      • 5.1.3 配置嵌入式服务器
      • 5.1.4 配置日志
      • 5.1.5 使用特殊的属性值
    • 5.2 创建自己的配置属性
      • 5.2.1 定义配置属性持有者
      • 5.2.2 声明配置属性元数据
    • 5.3 使用 profile 文件进行配置
      • 5.3.1 定义特定 profile 的属性
      • 5.3.2 激活 profile 文件
      • 5.3.3 有条件地使用 profile 文件创建 bean
    • 5.4 小结
  • 第二部分 集成 Spring
  • 第 6 章 创建 REST 服务
    • 6.1 编写 RESTful 控制器
      • 6.1.1 从服务器获取数据
      • 6.1.2 向服务器发送数据
      • 6.1.3 更新服务器上的资源
      • 6.1.4 从服务器删除数据
    • 6.2 启用超媒体
      • 6.2.1 添加超链接
      • 6.2.2 创建资源装配器
      • 6.2.3 嵌套命名关系
    • 6.3 启用以数据为中心的服务
      • 6.3.1 调整资源路径和关系名称
      • 6.3.2 分页和排序
      • 6.3.3 添加用户端点
      • 6.3.4 向 Spring Data 端点添加用户超链接
    • 6.4 小结
  • 第 7 章 调用 REST 服务
    • 7.1 使用 RestTemplate 调用 REST 端点
      • 7.1.1 请求 GET 资源
      • 7.1.2 请求 PUT 资源
      • 7.1.3 请求 DELETE 资源
      • 7.1.4 请求 POST 资源
    • 7.2 使用 Traverson 引导 REST API
    • 7.3 小结
  • 第 8 章 发送异步消息
    • 8.1 使用 JMS 发送消息
      • 8.1.3 接收 JMS 消息
      • 8.1.2 使用 JmsTemplate 发送消息
      • 8.1.1 设置 JMS
    • 8.2 使用 RabbitMQ 和 AMQP
      • 8.2.1 添加 RabbitMQ 到 Spring 中
      • 8.2.2 使用 RabbitTemplate 发送消息
      • 8.2.3 从 RabbitMQ 接收消息
    • 8.3 使用 Kafka 发送消息
      • 8.3.1 在 Spring 中设置 Kafka
      • 8.3.2 使用 KafkaTemplate 发送消息
      • 8.3.3 编写 Kafka 监听器
    • 8.4 小结
  • 第 9 章 集成 Spring
    • 9.1 声明简单的集成流
      • 9.1.1 使用 XML 定义集成流
      • 9.1.2 在 Java 中配置集成流
      • 9.1.3 使用 Spring Integration 的 DSL 配置
    • 9.2 探索 Spring Integration
      • 9.2.1 消息通道
      • 9.2.2 过滤器
      • 9.2.3 转换器
      • 9.2.4 路由
      • 9.2.5 分割器
      • 9.2.6 服务激活器
      • 9.2.7 网关
      • 9.2.8 通道适配器
      • 9.2.9 端点模块
    • 9.3 创建 Email 集成流
    • 9.4 总结
  • 第三部分 响应式 Spring
  • 第 10 章 Reactor 介绍
    • 10.1 理解响应式编程
      • 10.1.1 定义响应式流
    • 10.2 Reactor
      • 10.2.1 图解响应式流
      • 10.2.2 添加 Reactor 依赖
    • 10.3 通用响应式操作实战
      • 10.3.1 创建响应式类型
      • 10.3.2 响应式类型结合
      • 10.3.3 转换和过滤响应式流
      • 10.3.4 对反应类型执行逻辑操作
    • 10.4 总结
  • 第 11 章 开发响应式 API
    • 11.1 使用 Spring WebFlux
      • 11.1.1 Spring WebFlux 介绍
      • 11.1.2 编写响应式 Controller
    • 11.2 定义函数式请求处理程序
    • 11.3 测试响应式 Controller
      • 11.3.1 测试 GET 请求
      • 11.3.2 测试 POST 请求
      • 11.3.3 使用线上服务器进行测试
    • 11.4 响应式消费 REST API
      • 11.4.1 通过 GET 方式获取资源
      • 11.4.2 通过 POST 方式发送资源
      • 11.4.3 删除资源
      • 11.4.4 处理请求错误
      • 11.4.5 请求转换
    • 11.5 保护响应式 web API
      • 11.5.1 配置响应式 Web 安全
      • 11.5.2 配置响应式用户信息服务
    • 11.6 总结
  • 第 12 章 响应式持久化数据
    • 12.1 理解 Spring Data 响应式历程
      • 12.1.1 Spring Data 响应式精髓
      • 12.1.2 在响应式与非响应式之间进行转换
      • 12.1.3 开发响应式库
    • 12.2 使用响应式 Cassandra 库
      • 12.2.1 开启 Spring Data Cassandra
      • 12.2.2 理解 Cassandra 数据模型
      • 12.2.3 Cassandra 持久化实体映射
      • 12.2.4 编写响应式 Cassandra 库
    • 12.3 编写响应式 MongoDB 库
      • 12.3.1 开启Spring Data MongonDB
      • 12.3.2 MongoDB 持久化实体映射
      • 12.3.3 编写响应式 MongoDB 库
    • 12.4 总结
  • 第四部分 云原生 Spring
  • 第 13 章 服务发现
    • 13.1深入思考微服务
    • 13.2 配置服务注册
      • 13.2.1 配置 Eureka
      • 13.2.2 扩展 Eureka
    • 13.3 注册并发现服务
      • 13.3.1 配置 Eureka 客户端属性
      • 13.3.2 消费服务
    • 13.4 总结
  • 第 14 章 配置管理
    • 14.1 共享配置
    • 14.2 运行配置服务器
      • 14.2.1 启动配置服务器
      • 14.2.2 填写配置库
    • 14.3 消费共享的配置
    • 14.4 服务应用程序和特定配置文件的属性
      • 14.4.1 服务特定应用程序的属性
      • 14.4.2 服务配置文件属性
    • 14.5 为配置的属性加密
      • 14.5.1 在 Git 中加密属性
      • 14.5.2 在 Vault 中存储密码
    • 14.6 远程刷新配置属性
      • 14.6.1 手动刷新配置属性
      • 14.6.2 自动刷新配置属性
    • 14.7 总结
  • 第 15 章 处理失败和时延
    • 15.1 了解断路器
    • 15.2 定义断路器
      • 15.2.1 缓解时延
      • 15.2.2 管理断路器阈值
    • 15.3 管理失败事件
      • 15.3.1 介绍 Hystrix 面板
      • 15.3.2 了解 Hystrix 线程池
    • 15.4 聚合多个 Hystrix 流
    • 15.5 总结
  • 第五部分 部署Spring
  • 第 16 章 使用 SpringBoot Actuator
    • 16.1 介绍 Actuator
      • 16.1.1 配置 Actuator 基本路径
      • 16.1.2 启用和禁用 Actuator 端点
    • 16.2 使用 Actuator 端点
      • 16.2.1 获取重要的应用程序信息
      • 16.2.2 查看配置详细信息
      • 16.2.3 查看应用程序活动
      • 16.2.4 利用运行时指标
    • 16.3 自定义 Actuator
      • 16.3.1 向 /info 端点提供信息
      • 16.3.2 自定义健康指标
      • 16.3.3 注册自定义指标
      • 16.3.4 创建自定义端点
    • 16.4 保护 Actuator
    • 16.5 总结
  • 第 17 章 管理 Spring
    • 17.1 使用 SpringBoot Admin
      • 17.1.1 创建 Admin 服务端
      • 17.1.2 注册 Admin 客户端
    • 17.2 深入 Admin 服务端
      • 17.2.1 查看普通应用程序运行状况和信息
      • 17.2.2 观察关键指标
      • 17.2.3 检查环境属性
      • 17.2.4 查看并设置 log 级别
      • 17.2.5 监控线程
      • 17.2.6 追踪 HTTP 请求
    • 17.3 保护 Admin 服务端
      • 17.3.1 在 Admin 服务端中启用登录
      • 17.3.2 使用 Actuator 进行认证
    • 17.4 总结
  • 第 18 章 使用 JMX 监控 Spring
    • 18.1 使用 Actuator MBean
    • 18.2 创建自己的 MBean
    • 18.3 发送通知
    • 18.4 总结
  • 第 19 章 部署 Spring
    • 19.1 权衡部署选项
    • 19.2 构建并部署 WAR 文件
    • 19.3 将 JAR 文件推送到 Cloud Foundry
    • 19.4 在 Docker 容器中运行 SpringBoot
    • 19.5 终章
    • 19.6 总结
由 GitBook 提供支持
在本页

这有帮助吗?

  1. 第 10 章 Reactor 介绍
  2. 10.3 通用响应式操作实战

10.3.3 转换和过滤响应式流

上一页10.3.2 响应式类型结合下一页10.3.4 对反应类型执行逻辑操作

最后更新于2年前

这有帮助吗?

当数据流过 stream,你可能需要过滤或是修改一些值。在本节中,我们将看到的是转换和过滤流过响应式流中的数据。

从响应式类型中过滤数据

当数据从 Flux 中流出时,过滤数据的最基本方法之一就是简单地忽略前几个条目。如图 10.10 所示,skip() 操作正是这样做的。

给定一个包含多个条目的 Flux,skip() 操作将创建一个新的 Flux,该 Flux 在从源 Flux 发出剩余项之前跳过指定数量的项。下面的测试方法演示如何使用 skip():

@Test
public void skipAFew() {
    Flux<String> skipFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .skip(3);

    StepVerifier.create(skipFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
}

在本例中,有五个字符串项的流。对该流调用 skip(3) 将生成一个新的流,该流跳过前三个项,并且只发布最后两个项。

你也许不是想跳过特定数量的项目,而是需要过一段时间再跳过前几个项目。skip() 操作的另一种形式(如图 10.11 所示)是生成一个流,该流在从源流发出项之前等待一段指定的时间。

下面的测试方法使用 skip() 创建一个在发出任何值之前等待 4 秒的 Flux。由于该 Flux 是从项之间具有 1 秒延迟(使用 delayElements())的 Flux 创建的,因此只会发出最后两个项:

@Test
public void skipAFewSeconds() {
    Flux<String> skipFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .delayElements(Duration.ofSeconds(1))
        .skip(Duration.ofSeconds(4));

    StepVerifier.create(skipFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
}

你已经看到了 take() 操作的一个例子,但是根据 skip() 操作,take() 可以看作是 skip() 的反面。skip() 跳过前几个项,take() 只发出前几个项(如图 10.12 所示):

@Test
public void take() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .take(3);

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
}

与 skip() 一样,take() 也有一个基于持续时间而不是项目计数的可选项。它会在一段时间之后,将接收并发出与通过源 Flux 一样多的项。如图 10.13 所示:

以下测试方法使用 take() 的替代形式在订阅后的前 3.5 秒内发出尽可能多的项:

@Test
public void take() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .delayElements(Duration.ofSeconds(1))
        .take(Duration.ofMillis(3500));

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
}

skip() 和 take() 操作可以看作是基于计数或持续时间的筛选条件的操作。对于更通用的 Flux 值过滤,会发现filter() 操作非常有用。

给定一个决定一个项是否通过 Flux 的 Predicate,filter() 操作允许你根据需要的任何条件有选择地发布。图 10.14 中的弹珠图显示了 filter() 的工作原理。

要查看 filter() 的运行情况,请考虑以下测试方法:

@Test
public void filter() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .filter(np -> !np.contains(" "));

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Zion")
        .verifyComplete();
}

这里,filter() 被赋予一个 Predicate,它只接受没有空格的 String。因此,“Grand Canyon” 和 “Grand Teton” 被过滤掉。

也许你需要过滤的是你已经收到的任何项目。distinct() 操作(如图 10.15 所示)产生一个只发布源 Flux 中尚未发布的项的 Flux。

在下面的测试中,只有唯一的 String 值将从不同的 Flux 中发出:

@Test
public void distinct() {
    Flux<String> animalFlux = Flux.just(
        "dog", "cat", "bird", "dog", "bird", "anteater")
        .distinct();

    StepVerifier.create(animalFlux)
        .expectNext("dog", "cat", "bird", "anteater")
        .verifyComplete();
}

尽管 “dog” 和 “bird” 分别从源 Flux 中发布两次,但在 distinct Flux 中只发布一次。

映射响应式数据

对于 Flux 或 Mono,最常用的操作之一是将已发布的项转换为其他形式或类型。Reactor 为此提供 map() 和flatMap() 操作。

map() 操作会创建一个 Flux,该 Flux 在重新发布之前,按照给定函数对其接收的每个对象执行指定的转换。图 10.16 说明了 map() 操作的工作原理。

在以下测试方法中,表示篮球运动员的 String 值的 Flux 映射到 Player 对象的新 Flux:

@Test
public void map() {
    Flux<Player> playerFlux = Flux
        .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
        .map(n -> {
            String[] split = n.split("\\s");
            return new Player(split[0], split[1]);
        });

    StepVerifier.create(playerFlux)
        .expectNext(new Player("Michael", "Jordan"))
        .expectNext(new Player("Scottie", "Pippen"))
        .expectNext(new Player("Steve", "Kerr"))
        .verifyComplete();
}

给 map() 的 Function 接口(作为 lambda)将传入 String 以空格进行拆分,并使用生成的字符串数组创建 Player 对象。虽然用 just() 创建的流携带的是 String 对象,但是由 map() 生成的流携带的是 Player 对象。

关于 map() 的重要理解是,映射是同步执行的,因为每个项都是由源 Flux 发布的。如果要异步执行映射,应考虑使用 flatMap() 操作。

flatMap() 操作需要一些思考和实践才能变得很熟练。如图 10.17 所示,flatMap() 不是简单地将一个对象映射到另一个对象,而是将每个对象映射到一个新的 Mono 或 Flux。Mono 或 Flux 的结果被压成一个新的 Flux。当与subscribeOn() 一起使用时,flatMap() 可以释放 Reactor 类型的异步能力。

下面的测试方法展示了 flatMap() 和 subscribeOn() 的用法:

@Test
public void flatMap() {
    Flux<Player> playerFlux = Flux
        .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
        .flatMap(n -> Mono.just(n).map(p -> {
            String[] split = p.split("\\s");
            return new Player(split[0], split[1]);
        })
        .subscribeOn(Schedulers.parallel())
        );

    List<Player> playerList = Arrays.asList(
        new Player("Michael", "Jordan"),
        new Player("Scottie", "Pippen"),
        new Player("Steve", "Kerr"));

    StepVerifier.create(playerFlux)
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .verifyComplete();
}

请注意,flatMap() 被赋予一个 lambda 函数,该函数将传入 String 转换为 String 类型的 Mono。然后对 Mono 应用 map() 操作,将 String 转换为 Player。

如果你停在那里,产生的 Flux 将携带 Player 对象,以与 map() 示例相同的顺序同步生成。但是对 Mono 做的最后一件事是调用 subscribeOn() 来指示每个订阅应该在一个并行线程中进行。因此,可以异步和并行地执行多个传入 String 对象的映射操作。

尽管 subscribeOn() 的名称与 subscribe() 类似,但它们却截然不同。subscribe() 是一个动词,它订阅一个响应式流并有效地将其启动,而 subscribeOn() 则更具描述性,它指定了应该 如何 并发地处理订阅。Reactor 不强制任何特定的并发模型;通过 subscribeOn() 可以使用 Schedulers 程序中的一个静态方法指定要使用的并发模型。在本例中,使用了 parallel(),它是使用固定大小线程池的工作线程(大小与 CPU 内核的数量一样)。但是调度程序支持多个并发模型,如下表所示:

Schedulers 方法
描述

.immediate()

在当前线程中执行订阅

.single()

在单个可重用线程中执行订阅,对所有调用方重复使用同一线程

.newSingle()

在每个调用专用线程中执行订阅

.elastic()

在从无限弹性池中提取的工作进程中执行订阅,根据需要创建新的工作线程,并释放空闲的工作线程(默认情况下 60 秒)

.parallel()

在从固定大小的池中提取的工作进程中执行订阅,该池的大小取决于 CPU 核心的数量。

使用 flatMap() 和 subscribeOn() 的好处是,可以通过将工作分成多个并行线程来增加流的吞吐量。但由于这项工作是并行完成的,无法保证先完成哪项工作,因此无法知道产生的 Flux 中排放的项目的顺序。因此,StepVerifier 只能验证发出的每个项是否存在于 Player 对象的预期列表中,并且在 Flux 完成之前将有三个这样的项。

在响应式流上缓冲数据

在处理流经 Flux 的数据的过程中,你可能会发现将数据流分解成比特大小的块是有帮助的。buffer() 操作(如图 10.18 所示)可以解决这个问题。

给定一个 String 值的 Flux,每个值都包含一个水果的名称,你可以创建一个新的 List 集合的 Flux,其中每个 List 的元素数不超过指定的数目:

@Test
public void buffer() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");

    Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);

    StepVerifier
        .create(bufferedFlux)
        .expectNext(Arrays.asList("apple", "orange", "banana"))
        .expectNext(Arrays.asList("kiwi", "strawberry"))
        .verifyComplete();
}

在这种情况下,String 元素的 Flux 被缓冲到一个 List 集合的新 Flux 中,每个 List 集合包含的项不超过三个。因此,发出 5 个 String 的原始 Flux 将转换为发出两个List 集合的 Flux,一个包含 3 个水果,另一个包含 2 个水果。

那又怎么样?将值从响应式 Flux 缓冲到非响应式 List 集合似乎适得其反。但是,当将buffer() 与 flatMap() 结合使用时,它可以并行处理每个 List 集合:

Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
    .buffer(3)
    .flatMap(x -> 
         Flux.fromIterable(x)
             .map(y -> y.toUpperCase())
             .subscribeOn(Schedulers.parallel())
             .log()
    ).subscribe();

在这个新示例中,仍然将 5 个 String 值的 Flux 缓冲到 List 集合的新 Flux 中,然后将 flatMap() 应用于 List 集合的 Flux。这将获取每个 List 缓冲区并从其元素创建一个新的 Flux,然后对其应用 map() 操作。因此,每个缓冲 List 在单独的线程中进一步并行处理。

为了证明它是有效的,我还包含了一个要应用于每个子 Flux 的 log() 操作。log() 操作只记录所有的 Reactor Streams 事件,这样你就可以看到真正发生了什么。因此,以下条目将写入日志(为了简洁起见,删除了时间组件):

[main] INFO reactor.Flux.SubscribeOn.1 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.1 - request(32)
[main] INFO reactor.Flux.SubscribeOn.2 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.2 - request(32)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()

日志条目清楚地显示,第一个缓冲区(apple、orange 和 banana)中的水果在 parallel-1 线程中处理。同时,在第二个缓冲区(kiwi 和 strawberry)中的水果在 parallel-2 线程中进行处理。从每个缓冲区的日志条目交织在一起这一事实可以明显看出,这两个缓冲区是并行处理的。

如果出于某种原因,需要将 Flux 发出的所有内容收集到 List 中,则可以调用不带参数的 buffer():

Flux<List<List>> bufferedFlux = fruitFlux.buffer();

这将产生一个新的 Flux,该 Flux 会发出一个包含源 Flux 发布的所有项的 List。使用 collectList() 操作也可以实现同样的功能,如图 10.19 中的弹珠图所示:

collectList() 生成一个发布 List 的 Mono,而不是生成一个发布 List 的 Flux。以下测试方法说明了如何使用它:

@Test
public void collectList() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");
    Mono<List<String>> fruitListMono = fruitFlux.collectList();

    StepVerifier
        .create(fruitListMono)
        .expectNext(Arrays.asList(
            "apple", "orange", "banana", "kiwi", "strawberry"))
        .verifyComplete();
}

一种更有趣的收集 Flux 发送的项目的方法是把它们存到 Map 中。如图 10.20 所示,collectMap() 操作产生一个 Mono,它发布一个 Map,其中填充了由给定 Function 计算其键值的条目。

要查看 collectMap() 的实际操作,请查看以下测试方法:

@Test
public void collectMap() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");
    Mono<Map<Character, String>> animalMapMono =
        animalFlux.collectMap(a -> a.charAt(0));

    StepVerifier
        .create(animalMapMono)
        .expectNextMatches(map -> {
            return
                map.size() == 3 &&
                map.get('a').equals("aardvark") &&
                map.get('e').equals("eagle") &&
                map.get('k').equals("kangaroo");
        })
        .verifyComplete();
}

源 Flux 发出了一些动物的名字。在该 Flux 中,可以使用 collectMap() 创建一个新的 Mono,该 Mono 发送一个 Map,其中的键值由动物名称的第一个字母确定,并且该值是动物名称本身。如果两个动物名以同一个字母开头(如 elephant 和 eagle 或 koala 和 kangaroo),则流经流的最后一个条目将覆盖所有先前的条目。

图 10.10 skip 操作在将剩余消息传递给结果 Flux 之前跳过指定数量的消息
图 10.11 skip 操作的另一种形式是在将消息传递到结果 Flux 之前等待一段时间
图 10.12 take 操作只传递来自传入 Flux 的前几个消息,然后取消订阅
图 10.13 take 操作的另一种形式是在某个时间过去后,将消息传递给结果 Flux
图 10.14 传入的 Flux 可以被过滤,以便生成的 Flux 只接收与给定谓词匹配的消息
图 10.15 distinct 操作过滤掉所有重复的消息
图 10.16 map 操作在结果 Flux 上执行将传入消息转换为新消息
图 10.17 转换映射操作使用中间 Flux 来执行转换,从而允许异步转换
图 10.18 缓冲区操作会产生一个给定最大大小的 List Flux,这些 List 是从传入的 Flux 中收集的
图 10.19 collect-list 操作产生一个 Mono,其中包含由传入 Flux 发出的所有消息的列表
图 10.20 collect-map 操作产生一个 Mono,其中包含由传入 Flux 发出的消息的 Map,其中的键来自传入消息的某些特性