12.1.2 在响应式与非响应式之间进行转换

在我们进一步研究,如何使用 Spring Data 编写响应式 Repository 之前,先来看一个经常被刻意回避的问题。那就是您可能正在使用一个关系型数据库,若把您的数据都迁移到 Spring Data 响应式编程支持的四个数据库类型之一,其实不太现实,对吗?那是不是根本就不能在您的应用程序中使用响应式编程呢?

尽管要获得响应式编程的全部好处,要求响应式模型是从前到后都是响应式的,包括在数据库级别也要是响应式的。但是在非响应式数据库上使用响应式编程仍然会有一些好处。即使你选择的数据库不支持非阻塞的响应式查询,您仍然能够阻塞式的获取数据,然后即时地将其转换为响应式的上游组件。

例如,假设您正在使用关系数据库,并使用 Spring Data JPA 进行持久化。您的 OrderRepository 可能有类似下面的接口:

List<Order> findByUser(User user);

对于给定的 User,此方法非响应式地返回包含所有 Order 的单实体 List<Order> 。当调用 findByUser() 时,会在执行查询时阻塞,等待结果被收集到列表中。因为 List 不是响应式类型,你无法执行 Flux 提供的任何操作。此外,如果调用者是一个 Controller,它也不能响应式地提高可扩展性。

对于使用 JPA 调用 Repository 上的阻塞性方法,您没有什么可改变的。但是,一旦收到数据库返回,您可以将非响应 List 转换为 Flux,这样后续就可以用响应式的方式处理了。要这样做,您只需使用 Flux.fromIterable():

List<Order> orders = repo.findByUser(someUser);
Flux<Order> orderFlux = Flux.fromIterable(orders);

同样,如果要根据 ID 获取单个 Order,可以立即将其转换为 Mono:

Order order = repo.findById(Long id);
Mono<Order> orderMono = Mono.just(order);

英文版原书代码示例中(第299页)缺少一个 =

使用 Mono.just() 或 Flux 的 fromIterable()fromArray()fromStream() 方法,可以把非响应式代码隔离在 Repository 层,并在应用程序的其他地方使用响应式编程。

反过来会怎么样呢?如果有一个 Mono 或 Flux,需要在非响应式的 JPA Repository 上调用 save(),是不是不可以呢?幸运的是可以,Mono 和 Flux 都可以提取并发布成具体的实体类型或 Iterable。

例如,假设 WebFlux Controller 接收一个 Mono<Taco> 对象,并需要在 Spring Data JPA 上使用 save() 方法保存它。这完全没问题,只要调用 Mono 上的 block() 方法来提取 Taco 对象:

Taco taco = tacoMono.block();
tacoRepo.save(taco);

顾名思义,block() 方法将执行阻塞式的提取操作。

至于从 Flux 中提取数据,可能需要使用 toIterable()。比如说,您得到了一个 Flux<Taco>,需要在 Spring Data JPA 上调用 saveAll()。下面的代码片段演示如何做到这一点:

Iterable<Taco> tacos = tacoFlux.toIterable();
tacoRepo.saveAll(tacos);

Mono.block() 一样, Flux.toIterable() 在收集已发布的所有对象时会阻塞。因为它们的阻塞性, Mono.block()Flux.toIterable() 应谨慎使用,要时刻记着使用它们会打破响应式编程模型。

另一种响应式的,可避免阻塞提取操作的办法是订阅 Mono 或 Flux,对每个对象在发布时执行相应的操作。例如,如果 Repository 是非响应式的,可以执行以下操作:

tacoFlux.subscribe(taco -> {
    tacoRepo.save(taco);
});

即使 Repository 对 save() 方法的调用仍然是非响应式的阻塞操作,但使用 subscribe() 处理 Flux 和 Mono 发布的数据, 是一种更适合和更标准的响应式方式。

关于如何使用非响应性 Repository 的讨论已经足够多了。现在,让我们开始使用 Spring Data 的响应式编程能力,为 Taco Cloud 应用程序创建真正的响应式 Repository。

最后更新于