Spring 实战(第五版)
  • Spring 实战(第 5 版)
  • 第一部分 Spring 基础
  • 第 1 章 Spring 入门
    • 1.1 什么是 Spring?
    • 1.2 初始化 Spring 应用程序
      • 1.2.1 使用 Spring Tool Suite 初始化 Spring 项目
      • 1.2.2 检查 Spring 项目结构
    • 1.3 编写 Spring 应用程序
      • 1.3.1 处理 web 请求
      • 1.3.2 定义视图
      • 1.3.3 测试控制器
      • 1.3.4 构建并运行应用程序
      • 1.3.5 了解 Spring Boot DevTools
      • 1.3.6 回顾
    • 1.4 俯瞰 Spring 风景线
      • 1.4.1 Spring 核心框架
      • 1.4.2 Spring Boot
      • 1.4.3 Spring Data
      • 1.4.4 Spring Security
      • 1.4.5 Spring Integration 和 Spring Batch
      • 1.4.6 Spring Cloud
    • 1.5 小结
  • 第 2 章 开发 Web 应用程序
    • 2.1 展示信息
      • 2.1.1 建立域
      • 2.1.2 创建控制器类
      • 2.1.3 设计视图
    • 2.2 处理表单提交
    • 2.3 验证表单输入
      • 2.3.1 声明验证规则
      • 2.3.2 在表单绑定时执行验证
      • 2.3.3 显示验证错误
    • 2.4 使用视图控制器
    • 2.5 选择视图模板库
      • 2.5.1 缓存模板
    • 2.6 小结
  • 第 3 章 处理数据
    • 3.1 使用 JDBC 读写数据
      • 3.1.1 为域适配持久化
      • 3.1.2 使用 JdbcTemplate
      • 3.1.3 定义模式并预加载数据
      • 3.1.4 插入数据
    • 3.2 使用 Spring Data JPA 持久化数据
      • 3.2.1 添加 Spring Data JPA 到数据库中
      • 3.2.2 注解域作为实体
      • 3.2.3 声明 JPA repository
      • 3.2.4 自定义 JPA repository
    • 3.3 小结
  • 第 4 章 Spring 安全
    • 4.1 启用 Spring Security
    • 4.2 配置 Spring Security
      • 4.2.1 内存用户存储
      • 4.2.2 基于 JDBC 的用户存储
      • 4.2.3 LDAP 支持的用户存储
      • 4.2.4 自定义用户身份验证
    • 4.3 保护 web 请求
      • 4.3.1 保护请求
      • 4.3.2 创建用户登录页面
      • 4.3.3 登出
      • 4.3.4 阻止跨站请求伪造攻击
    • 4.4 了解你的用户
    • 4.5 小结
  • 第 5 章 使用配置属性
    • 5.1 微调自动配置
      • 5.1.1 理解 Spring 环境抽象
      • 5.1.2 配置数据源
      • 5.1.3 配置嵌入式服务器
      • 5.1.4 配置日志
      • 5.1.5 使用特殊的属性值
    • 5.2 创建自己的配置属性
      • 5.2.1 定义配置属性持有者
      • 5.2.2 声明配置属性元数据
    • 5.3 使用 profile 文件进行配置
      • 5.3.1 定义特定 profile 的属性
      • 5.3.2 激活 profile 文件
      • 5.3.3 有条件地使用 profile 文件创建 bean
    • 5.4 小结
  • 第二部分 集成 Spring
  • 第 6 章 创建 REST 服务
    • 6.1 编写 RESTful 控制器
      • 6.1.1 从服务器获取数据
      • 6.1.2 向服务器发送数据
      • 6.1.3 更新服务器上的资源
      • 6.1.4 从服务器删除数据
    • 6.2 启用超媒体
      • 6.2.1 添加超链接
      • 6.2.2 创建资源装配器
      • 6.2.3 嵌套命名关系
    • 6.3 启用以数据为中心的服务
      • 6.3.1 调整资源路径和关系名称
      • 6.3.2 分页和排序
      • 6.3.3 添加用户端点
      • 6.3.4 向 Spring Data 端点添加用户超链接
    • 6.4 小结
  • 第 7 章 调用 REST 服务
    • 7.1 使用 RestTemplate 调用 REST 端点
      • 7.1.1 请求 GET 资源
      • 7.1.2 请求 PUT 资源
      • 7.1.3 请求 DELETE 资源
      • 7.1.4 请求 POST 资源
    • 7.2 使用 Traverson 引导 REST API
    • 7.3 小结
  • 第 8 章 发送异步消息
    • 8.1 使用 JMS 发送消息
      • 8.1.3 接收 JMS 消息
      • 8.1.2 使用 JmsTemplate 发送消息
      • 8.1.1 设置 JMS
    • 8.2 使用 RabbitMQ 和 AMQP
      • 8.2.1 添加 RabbitMQ 到 Spring 中
      • 8.2.2 使用 RabbitTemplate 发送消息
      • 8.2.3 从 RabbitMQ 接收消息
    • 8.3 使用 Kafka 发送消息
      • 8.3.1 在 Spring 中设置 Kafka
      • 8.3.2 使用 KafkaTemplate 发送消息
      • 8.3.3 编写 Kafka 监听器
    • 8.4 小结
  • 第 9 章 集成 Spring
    • 9.1 声明简单的集成流
      • 9.1.1 使用 XML 定义集成流
      • 9.1.2 在 Java 中配置集成流
      • 9.1.3 使用 Spring Integration 的 DSL 配置
    • 9.2 探索 Spring Integration
      • 9.2.1 消息通道
      • 9.2.2 过滤器
      • 9.2.3 转换器
      • 9.2.4 路由
      • 9.2.5 分割器
      • 9.2.6 服务激活器
      • 9.2.7 网关
      • 9.2.8 通道适配器
      • 9.2.9 端点模块
    • 9.3 创建 Email 集成流
    • 9.4 总结
  • 第三部分 响应式 Spring
  • 第 10 章 Reactor 介绍
    • 10.1 理解响应式编程
      • 10.1.1 定义响应式流
    • 10.2 Reactor
      • 10.2.1 图解响应式流
      • 10.2.2 添加 Reactor 依赖
    • 10.3 通用响应式操作实战
      • 10.3.1 创建响应式类型
      • 10.3.2 响应式类型结合
      • 10.3.3 转换和过滤响应式流
      • 10.3.4 对反应类型执行逻辑操作
    • 10.4 总结
  • 第 11 章 开发响应式 API
    • 11.1 使用 Spring WebFlux
      • 11.1.1 Spring WebFlux 介绍
      • 11.1.2 编写响应式 Controller
    • 11.2 定义函数式请求处理程序
    • 11.3 测试响应式 Controller
      • 11.3.1 测试 GET 请求
      • 11.3.2 测试 POST 请求
      • 11.3.3 使用线上服务器进行测试
    • 11.4 响应式消费 REST API
      • 11.4.1 通过 GET 方式获取资源
      • 11.4.2 通过 POST 方式发送资源
      • 11.4.3 删除资源
      • 11.4.4 处理请求错误
      • 11.4.5 请求转换
    • 11.5 保护响应式 web API
      • 11.5.1 配置响应式 Web 安全
      • 11.5.2 配置响应式用户信息服务
    • 11.6 总结
  • 第 12 章 响应式持久化数据
    • 12.1 理解 Spring Data 响应式历程
      • 12.1.1 Spring Data 响应式精髓
      • 12.1.2 在响应式与非响应式之间进行转换
      • 12.1.3 开发响应式库
    • 12.2 使用响应式 Cassandra 库
      • 12.2.1 开启 Spring Data Cassandra
      • 12.2.2 理解 Cassandra 数据模型
      • 12.2.3 Cassandra 持久化实体映射
      • 12.2.4 编写响应式 Cassandra 库
    • 12.3 编写响应式 MongoDB 库
      • 12.3.1 开启Spring Data MongonDB
      • 12.3.2 MongoDB 持久化实体映射
      • 12.3.3 编写响应式 MongoDB 库
    • 12.4 总结
  • 第四部分 云原生 Spring
  • 第 13 章 服务发现
    • 13.1深入思考微服务
    • 13.2 配置服务注册
      • 13.2.1 配置 Eureka
      • 13.2.2 扩展 Eureka
    • 13.3 注册并发现服务
      • 13.3.1 配置 Eureka 客户端属性
      • 13.3.2 消费服务
    • 13.4 总结
  • 第 14 章 配置管理
    • 14.1 共享配置
    • 14.2 运行配置服务器
      • 14.2.1 启动配置服务器
      • 14.2.2 填写配置库
    • 14.3 消费共享的配置
    • 14.4 服务应用程序和特定配置文件的属性
      • 14.4.1 服务特定应用程序的属性
      • 14.4.2 服务配置文件属性
    • 14.5 为配置的属性加密
      • 14.5.1 在 Git 中加密属性
      • 14.5.2 在 Vault 中存储密码
    • 14.6 远程刷新配置属性
      • 14.6.1 手动刷新配置属性
      • 14.6.2 自动刷新配置属性
    • 14.7 总结
  • 第 15 章 处理失败和时延
    • 15.1 了解断路器
    • 15.2 定义断路器
      • 15.2.1 缓解时延
      • 15.2.2 管理断路器阈值
    • 15.3 管理失败事件
      • 15.3.1 介绍 Hystrix 面板
      • 15.3.2 了解 Hystrix 线程池
    • 15.4 聚合多个 Hystrix 流
    • 15.5 总结
  • 第五部分 部署Spring
  • 第 16 章 使用 SpringBoot Actuator
    • 16.1 介绍 Actuator
      • 16.1.1 配置 Actuator 基本路径
      • 16.1.2 启用和禁用 Actuator 端点
    • 16.2 使用 Actuator 端点
      • 16.2.1 获取重要的应用程序信息
      • 16.2.2 查看配置详细信息
      • 16.2.3 查看应用程序活动
      • 16.2.4 利用运行时指标
    • 16.3 自定义 Actuator
      • 16.3.1 向 /info 端点提供信息
      • 16.3.2 自定义健康指标
      • 16.3.3 注册自定义指标
      • 16.3.4 创建自定义端点
    • 16.4 保护 Actuator
    • 16.5 总结
  • 第 17 章 管理 Spring
    • 17.1 使用 SpringBoot Admin
      • 17.1.1 创建 Admin 服务端
      • 17.1.2 注册 Admin 客户端
    • 17.2 深入 Admin 服务端
      • 17.2.1 查看普通应用程序运行状况和信息
      • 17.2.2 观察关键指标
      • 17.2.3 检查环境属性
      • 17.2.4 查看并设置 log 级别
      • 17.2.5 监控线程
      • 17.2.6 追踪 HTTP 请求
    • 17.3 保护 Admin 服务端
      • 17.3.1 在 Admin 服务端中启用登录
      • 17.3.2 使用 Actuator 进行认证
    • 17.4 总结
  • 第 18 章 使用 JMX 监控 Spring
    • 18.1 使用 Actuator MBean
    • 18.2 创建自己的 MBean
    • 18.3 发送通知
    • 18.4 总结
  • 第 19 章 部署 Spring
    • 19.1 权衡部署选项
    • 19.2 构建并部署 WAR 文件
    • 19.3 将 JAR 文件推送到 Cloud Foundry
    • 19.4 在 Docker 容器中运行 SpringBoot
    • 19.5 终章
    • 19.6 总结
由 GitBook 提供支持
在本页

这有帮助吗?

  1. 第 10 章 Reactor 介绍
  2. 10.3 通用响应式操作实战

10.3.2 响应式类型结合

上一页10.3.1 创建响应式类型下一页10.3.3 转换和过滤响应式流

最后更新于4年前

这有帮助吗?

你可能发现你需要将两种响应式类型以某种方式合并到一起。或者,在其他情况下,你可能需要将 Flux 分解成多个响应式类型。在本节中,我们将研究 Reactor 中 Flux 和 Mono 的结合和分解操作。

合并响应式类型

假设你有两个 Flux 流,并需要建立一个汇聚结果的 Flux,它会因为能够得到上流的 Flux 流,所以能够产生数据。为了将一个 Flux 与另一个合并,可以使用 mergeWith() 操作,如在图 10.6 展示的弹珠图一样:

例如,假设第一个 Flux 其值是电视和电影人物的名字,第二个 Flux 其值是食品的名称。下面的测试方法将展示如何使用 mergeWith() 方法合并两个 Flux 对象:

@Test
public void mergeFluxes() {
    Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa")
        .delayElements(Duration.ofMillis(500));

    Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples")
        .delaySubscription(Duration.ofMillis(250))
        .delayElements(Duration.ofMillis(500));

    Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);

    StepVerifier.create(mergedFlux)
        .expectNext("Garfield")
        .expectNext("Lasagna")
        .expectNext("Kojak")
        .expectNext("Lollipops")
        .expectNext("Barbossa")
        .expectNext("Apples")
        .verifyComplete();
}

通常情况下,Flux 会尽可能快的快地发送数据。因此,需要在创建 Flux 的时候使用 delayElements() 操作,用来将数据发送速度减慢 —— 每 0.5s 发送一个数据。此外,你将 delaySubscription() 操作应用于 foodFlux,使得它在延迟 250ms 后才会发送数据,因此 foodFlux 将会在 characterFlux 之后执行。

合并这两个 Flux 对象后,新的合并后的 Flux 被创建。当 StepVerifier 订阅合并后的 Flux 时,它会依次订阅两个 Flux 源。

合并后的 Flux 发出的数据的顺序,与源发出的数据的时间顺序一致。由于两个 Flux 都被设置为固定频率发送数据,因此值会通过合并后的 Flux 交替出现 —— character...food...character...food 一直这样下去。如何其中任何一个 Flux 的发送时间被修改了的话,你可能会看到 2 个 charater 跟在 1 个 food 后面或是 2 个 food 跟在 1 个 character 后面的情况。

因为 mergeWith() 不能保证源之间的完美交替,所以可能需要考虑使用 zip() 操作。当两个 Flux 对象压缩在一起时,会产生一个新的 Flux,该 Flux 生成一个元组,其中元组包含来自每个源 Flux 的一个项。图 10.7 说明了如何将两个 Flux 对象压缩在一起。

为了看看 zip() 操作的执行情况,参考一下下面的测试方法,它把 character Flux 和 food Flux 压缩在了一起:

@Test
public void zipFluxes() {
    Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");

    Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFlux);

    StepVerifier.create(zippedFlux)
        .expectNextMatches(p ->
            p.getT1().equals("Garfield") &&
            p.getT2().equals("Lasagna"))
        .expectNextMatches(p ->
            p.getT1().equals("Kojak") &&
            p.getT2().equals("Lollipops"))
        .expectNextMatches(p ->
            p.getT1().equals("Barbossa") &&
            p.getT2().equals("Apples"))
        .verifyComplete();
}

注意,与 mergeWith() 不同的是,zip() 操作是一个静态的创建操作,通过它创建的 Flux 使 character 和 food 完美对齐。从压缩后的 Flux 发送出来的每个项目都是 Tuple2(包含两个对象的容器),其中包含每一个源 Flux 的数据。

如果你不想使用 Tuple2,而是想用一些使用其他类型,你可以提供给 zip() 你想产生任何对象的 Function 接口。

例如,以下的试验方法说明了如何压缩的 character Flux 和 food Flux,使得它产生 String 类型的的 Flux 对象:

@Test
public void zipFluxesToObject() {
    Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");

    Flux<String> zippedFlux = Flux.zip(characterFlux, foodFlux,
                                   (c, f) -> c + " eats " + f);

    StepVerifier.create(zippedFlux)
        .expectNext("Garfield eats Lasagna")
        .expectNext("Kojak eats Lollipops")
        .expectNext("Barbossa eats Apples")
        .verifyComplete();
}

给 zip() 的 Function 接口(这里给出一个 lambda 表达式)简单地把两个值连接成一句话,由压缩后的 Flux 进行数据发送。

选择第一个响应式类型进行发布

假设你有两个 Flux 对象,你只是想创建一个新的发送从第一个 Flux 产生值的 Flux,而不是将两个 Flux 合并在一起。如图 10.9 所示,first() 操作选择两个 Flux 对象的第一个对象然后输出它的值。

下面的测试方法创建一个 fast Flux 和 slow Flux(这里的 “slow” 的意思是它在订阅之后 100ms 才发布数据)。通过使用 first(),它创建了一个新的 Flux,将只会发布从第一个源 Flux 发布的数据:

(译者注:在 reactor 3.4 版本中,first() 方法已被废弃,并将在 3.5 版本中被移除,替代方法是 firstWithSignal(),选择第一个发出 Signal 的 Flux 对象)

@Test
public void firstFlux() {
    Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
        .delaySubscription(Duration.ofMillis(100));
    Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");

    // reactor 3.5: Flux<String> firstFlux = Flux.firstWithSignal(slowFlux, fastFlux);
    Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);

    StepVerifier.create(firstFlux)
        .expectNext("hare")
        .expectNext("cheetah")
        .expectNext("squirrel")
        .verifyComplete();
}

在这种情况下,因为在 fast Flux 已经开始发布后 100ms,slow Flux 才开始发布数据,这样导致新创建的 Flux 将完全忽略 slow Flux,而只发布 fast flux 中的数据。

图 10.6 合并两个 Flux 会将它们的消息交织成一个新的 Flux
图 10.7 压缩两个 Flux 为一个 Flux
图 10.8 zip 操作的另一种形式导致从每个传入 Flux 的一个元素创建的消息 Flux
图 10.9 第一个操作选择第一个 Flux 来发送消息,然后仅从该 Flux 生成消息