Spring 响应式编程特性详解
Spring 框架提供了全面的响应式编程支持,主要通过 Spring WebFlux、Project Reactor 和 Spring Data R2DBC 等模块实现。以下是 Spring 响应式编程的主要特性:
1. 响应式核心组件
Project Reactor
- Mono: 表示 0-1 个元素的异步序列,用于返回单个结果或空结果
- Flux: 表示 0-N 个元素的异步序列,用于处理多个元素的数据流
- Scheduler: 提供灵活的线程调度机制,支持并行、顺序、定时等多种执行模式
Mono 使用示例
import reactor.core.publisher.Mono;
import java.time.Duration;
public class MonoExample {
public static void main(String[] args) {
// 创建一个包含单个值的 Mono
Mono<String> mono1 = Mono.just("Hello, Reactor!");
// 创建一个空的 Mono
Mono<String> mono2 = Mono.empty();
// 创建一个包含错误的 Mono
Mono<String> mono3 = Mono.error(new RuntimeException("Something went wrong"));
// 异步创建 Mono
Mono<String> mono4 = Mono.fromSupplier(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Async result";
});
// 订阅并处理结果
mono1.subscribe(
value -> System.out.println("Received: " + value),
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("Completed!")
);
// 使用 map 转换数据
Mono<Integer> mono5 = Mono.just(10)
.map(x -> x * 2)
.map(x -> x + 5);
// 使用 flatMap 进行异步转换
Mono<String> mono6 = Mono.just("user123")
.flatMap(userId -> fetchUserAsync(userId));
// 使用 filter 过滤数据
Mono<Integer> mono7 = Mono.just(15)
.filter(x -> x > 10)
.switchIfEmpty(Mono.just(0));
// 使用 defaultIfEmpty 提供默认值
Mono<String> mono8 = Mono.<String>empty()
.defaultIfEmpty("Default value");
// 使用 timeout 处理超时
Mono<String> mono9 = Mono.fromSupplier(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Delayed result";
}).timeout(Duration.ofMillis(1000), Mono.just("Timeout fallback"));
// 使用 retry 重试
Mono<String> mono10 = Mono.fromSupplier(() -> {
if (Math.random() > 0.7) {
throw new RuntimeException("Random error");
}
return "Success";
}).retry(3);
}
private static Mono<String> fetchUserAsync(String userId) {
return Mono.just("User: " + userId);
}
}
Flux 使用示例
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
public class FluxExample {
public static void main(String[] args) {
// 创建一个包含多个值的 Flux
Flux<String> flux1 = Flux.just("A", "B", "C", "D", "E");
// 从集合创建 Flux
List<String> list = Arrays.asList("X", "Y", "Z");
Flux<String> flux2 = Flux.fromIterable(list);
// 创建一个范围的 Flux
Flux<Integer> flux3 = Flux.range(1, 10);
// 创建一个间隔的 Flux
Flux<Long> flux4 = Flux.interval(Duration.ofSeconds(1));
// 订阅并处理结果
flux1.subscribe(
value -> System.out.println("Received: " + value),
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("Completed!")
);
// 使用 map 转换数据
Flux<Integer> flux5 = Flux.range(1, 5)
.map(x -> x * 2);
// 使用 filter 过滤数据
Flux<Integer> flux6 = Flux.range(1, 10)
.filter(x -> x % 2 == 0);
// 使用 flatMap 进行异步转换
Flux<String> flux7 = Flux.just("user1", "user2", "user3")
.flatMap(userId -> fetchUserDetailsAsync(userId));
// 使用 collectList 将 Flux 转换为 Mono<List>
Mono<List<Integer>> mono1 = Flux.range(1, 5)
.collectList();
// 使用 reduce 聚合数据
Mono<Integer> mono2 = Flux.range(1, 5)
.reduce((a, b) -> a + b);
// 使用 buffer 批量处理
Flux<List<Integer>> flux8 = Flux.range(1, 10)
.buffer(3);
// 使用 window 分组处理
Flux<Flux<Integer>> flux9 = Flux.range(1, 10)
.window(3);
// 使用 merge 合并多个 Flux
Flux<Integer> flux10 = Flux.merge(
Flux.range(1, 3),
Flux.range(4, 3),
Flux.range(7, 3)
);
// 使用 zip 组合多个 Flux
Flux<String> flux11 = Flux.zip(
Flux.just("A", "B", "C"),
Flux.just(1, 2, 3),
Flux.just(true, false, true)
).map(tuple -> tuple.getT1() + "-" + tuple.getT2() + "-" + tuple.getT3());
// 使用 take 和 skip
Flux<Integer> flux12 = Flux.range(1, 10)
.skip(2)
.take(5);
// 使用 distinct 去重
Flux<Integer> flux13 = Flux.just(1, 2, 2, 3, 3, 3, 4)
.distinct();
// 使用 distinctUntilChanged 去除连续重复
Flux<Integer> flux14 = Flux.just(1, 1, 2, 2, 2, 3, 3)
.distinctUntilChanged();
// 使用 throttleFirst 限流
Flux<Integer> flux15 = Flux.interval(Duration.ofMillis(100))
.map(Long::intValue)
.throttleFirst(Duration.ofMillis(500));
// 使用 delayElements 延迟发射
Flux<Integer> flux16 = Flux.range(1, 5)
.delayElements(Duration.ofMillis(500));
// 使用 doOnNext 和 doOnError 添加副作用
Flux<Integer> flux17 = Flux.range(1, 5)
.doOnNext(value -> System.out.println("Processing: " + value))
.doOnError(error -> System.err.println("Error: " + error))
.doOnComplete(() -> System.out.println("All done!"));
// 使用 onErrorResume 错误恢复
Flux<String> flux18 = Flux.just("A", "B", "C")
.map(s -> {
if (s.equals("B")) {
throw new RuntimeException("Error on B");
}
return s;
})
.onErrorResume(error -> Flux.just("Fallback1", "Fallback2"));
// 使用 onErrorReturn 返回默认值
Flux<String> flux19 = Flux.just("A", "B", "C")
.map(s -> {
if (s.equals("B")) {
throw new RuntimeException("Error on B");
}
return s;
})
.onErrorReturn("Default");
}
private static Flux<String> fetchUserDetailsAsync(String userId) {
return Flux.just(userId + "-detail1", userId + "-detail2");
}
}
Scheduler 使用示例
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class SchedulerExample {
public static void main(String[] args) throws InterruptedException {
// 使用 Schedulers.parallel() 并行处理
Flux.range(1, 10)
.parallel()
.runOn(Schedulers.parallel())
.map(i -> {
System.out.println("Processing " + i + " on thread: " + Thread.currentThread().getName());
return i * 2;
})
.sequential()
.subscribe();
Thread.sleep(1000);
// 使用 Schedulers.elastic() 处理 I/O 操作
Flux.just("file1.txt", "file2.txt", "file3.txt")
.flatMap(filename ->
Mono.fromCallable(() -> readFile(filename))
.subscribeOn(Schedulers.elastic())
)
.subscribe(content -> System.out.println("Content: " + content));
Thread.sleep(1000);
// 使用 Schedulers.single() 单线程执行
Flux.range(1, 5)
.publishOn(Schedulers.single())
.map(i -> {
System.out.println("Single thread: " + Thread.currentThread().getName());
return i;
})
.subscribe();
Thread.sleep(1000);
// 使用 Schedulers.boundedElastic() 有界弹性调度器
Flux.range(1, 20)
.flatMap(i ->
Mono.fromCallable(() -> heavyTask(i))
.subscribeOn(Schedulers.boundedElastic())
)
.subscribe();
Thread.sleep(2000);
}
private static String readFile(String filename) {
// 模拟文件读取
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Content of " + filename;
}
private static Integer heavyTask(int i) {
// 模拟耗时任务
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i * i;
}
}
背压处理
- 实现了 Reactive Streams 规范,提供完善的背压机制
- 支持多种背压策略:缓冲、丢弃、最新值等
- 防止快速生产者压垮慢速消费者
背压处理示例
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class BackpressureExample {
public static void main(String[] args) throws InterruptedException {
// 使用 onBackpressureBuffer 缓冲策略
Flux.range(1, 1000)
.onBackpressureBuffer(10) // 缓冲最多10个元素
.subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.single())
.subscribe(
value -> {
System.out.println("Processing: " + value);
try {
Thread.sleep(100); // 模拟慢速消费者
} catch (InterruptedException e) {
e.printStackTrace();
}
},
error -> System.err.println("Error: " + error)
);
Thread.sleep(2000);
// 使用 onBackpressureDrop 丢弃策略
Flux.range(1, 1000)
.onBackpressureDrop(dropped -> System.out.println("Dropped: " + dropped))
.subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.single())
.subscribe(
value -> {
System.out.println("Processing: " + value);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
},
error -> System.err.println("Error: " + error)
);
Thread.sleep(2000);
// 使用 onBackpressureLatest 保留最新值策略
Flux.interval(Duration.ofMillis(10))
.onBackpressureLatest()
.subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.single())
.subscribe(
value -> {
System.out.println("Latest: " + value);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
},
error -> System.err.println("Error: " + error)
);
Thread.sleep(2000);
// 使用 onBackpressureError 错误策略
Flux.range(1, 1000)
.onBackpressureError()
.subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.single())
.subscribe(
value -> {
System.out.println("Processing: " + value);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
},
error -> System.err.println("Backpressure error: " + error)
);
Thread.sleep(2000);
}
}
2. Spring WebFlux 特性
非阻塞 Web 框架
- 完全非阻塞、事件驱动的架构
- 支持 Reactive Streams 背压
- 适用于高并发、低延迟场景
多种运行时支持
- Netty 作为默认服务器
- 支持 Servlet 3.1+ 容器(如 Tomcat、Jetty)
- 支持 Undertow 作为服务器
函数式编程模型
- 注解式控制器(类似 Spring MVC)
- 函数式路由(RouterFunction 和 HandlerFunction)
- 轻量级、函数式的开发方式
注解式控制器示例
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
public UserController(UserService userService) {
this.userService = userService;
}
// 获取所有用户
@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll();
}
// 根据 ID 获取用户
@GetMapping("/{id}")
public Mono<User> getUserById(@PathVariable String id) {
return userService.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found")));
}
// 创建用户
@PostMapping
public Mono<User> createUser(@RequestBody User user) {
return userService.save(user);
}
// 更新用户
@PutMapping("/{id}")
public Mono<User> updateUser(@PathVariable String id, @RequestBody User user) {
return userService.update(id, user);
}
// 删除用户
@DeleteMapping("/{id}")
public Mono<Void> deleteUser(@PathVariable String id) {
return userService.deleteById(id);
}
// 批量创建用户
@PostMapping("/batch")
public Flux<User> createUsers(@RequestBody Flux<User> users) {
return userService.saveAll(users);
}
// 搜索用户
@GetMapping("/search")
public Flux<User> searchUsers(
@RequestParam(required = false) String name,
@RequestParam(required = false) String email
) {
return userService.search(name, email);
}
// 分页查询
@GetMapping("/page")
public Flux<User> getUsersByPage(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "10") int size
) {
return userService.findByPage(page, size);
}
}
// 用户实体类
class User {
private String id;
private String name;
private String email;
private Integer age;
// 构造器、getter 和 setter
public User() {}
public User(String id, String name, String email, Integer age) {
this.id = id;
this.name = name;
this.email = email;
this.age = age;
}
// getter 和 setter 方法
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; }
}
// 自定义异常
class UserNotFoundException extends RuntimeException {
public UserNotFoundException(String message) {
super(message);
}
}
函数式路由示例
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.*;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
@Configuration
public class UserRouter {
private final UserHandler userHandler;
public UserRouter(UserHandler userHandler) {
this.userHandler = userHandler;
}
@Bean
public RouterFunction<ServerResponse> userRoutes() {
return RouterFunctions.route()
// 获取所有用户
.GET("/api/users", userHandler::getAllUsers)
// 根据 ID 获取用户
.GET("/api/users/{id}", userHandler::getUserById)
// 创建用户
.POST("/api/users", userHandler::createUser)
// 更新用户
.PUT("/api/users/{id}", userHandler::updateUser)
// 删除用户
.DELETE("/api/users/{id}", userHandler::deleteUser)
// 搜索用户
.GET("/api/users/search", userHandler::searchUsers)
// 分页查询
.GET("/api/users/page", userHandler::getUsersByPage)
.build();
}
}
// 处理器类
@Component
class UserHandler {
private final UserService userService;
public UserHandler(UserService userService) {
this.userService = userService;
}
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
Flux<User> users = userService.findAll();
return ServerResponse.ok().body(users, User.class);
}
public Mono<ServerResponse> getUserById(ServerRequest request) {
String id = request.pathVariable("id");
return userService.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> createUser(ServerRequest request) {
Mono<User> userMono = request.bodyToMono(User.class);
return userMono.flatMap(user ->
userService.save(user)
.flatMap(savedUser -> ServerResponse.ok().bodyValue(savedUser))
);
}
public Mono<ServerResponse> updateUser(ServerRequest request) {
String id = request.pathVariable("id");
Mono<User> userMono = request.bodyToMono(User.class);
return userMono.flatMap(user ->
userService.update(id, user)
.flatMap(updatedUser -> ServerResponse.ok().bodyValue(updatedUser))
.switchIfEmpty(ServerResponse.notFound().build())
);
}
public Mono<ServerResponse> deleteUser(ServerRequest request) {
String id = request.pathVariable("id");
return userService.deleteById(id)
.then(ServerResponse.noContent().build())
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> searchUsers(ServerRequest request) {
String name = request.queryParam("name").orElse(null);
String email = request.queryParam("email").orElse(null);
Flux<User> users = userService.search(name, email);
return ServerResponse.ok().body(users, User.class);
}
public Mono<ServerResponse> getUsersByPage(ServerRequest request) {
int page = Integer.parseInt(request.queryParam("page").orElse("0"));
int size = Integer.parseInt(request.queryParam("size").orElse("10"));
Flux<User> users = userService.findByPage(page, size);
return ServerResponse.ok().body(users, User.class);
}
}
WebFlux 过滤器示例
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
@Component
public class LoggingFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String path = exchange.getRequest().getPath().value();
String method = exchange.getRequest().getMethod().name();
System.out.println("Request: " + method + " " + path);
long startTime = System.currentTimeMillis();
return chain.filter(exchange)
.doOnSuccess(aVoid -> {
long duration = System.currentTimeMillis() - startTime;
System.out.println("Response: " + exchange.getResponse().getStatusCode() +
" - Duration: " + duration + "ms");
})
.doOnError(error -> {
System.err.println("Error: " + error.getMessage());
});
}
}
// 认证过滤器
@Component
public class AuthenticationFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String token = exchange.getRequest().getHeaders().getFirst("Authorization");
if (token == null || !token.startsWith("Bearer ")) {
exchange.getResponse().setStatusCode(org.springframework.http.HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
// 验证 token
String jwtToken = token.substring(7);
if (!validateToken(jwtToken)) {
exchange.getResponse().setStatusCode(org.springframework.http.HttpStatus.FORBIDDEN);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
private boolean validateToken(String token) {
// 实际项目中应该使用 JWT 库验证 token
return !token.isEmpty();
}
}
全局异常处理示例
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@RestControllerAdvice
public class GlobalExceptionHandler {
// 处理用户未找到异常
@ExceptionHandler(UserNotFoundException.class)
public Mono<ServerResponse> handleUserNotFound(
UserNotFoundException ex,
ServerWebExchange exchange
) {
ErrorResponse error = new ErrorResponse(
"USER_NOT_FOUND",
ex.getMessage(),
exchange.getRequest().getPath().value()
);
return ServerResponse
.status(org.springframework.http.HttpStatus.NOT_FOUND)
.bodyValue(error);
}
// 处理验证异常
@ExceptionHandler(MethodArgumentNotValidException.class)
public Mono<ServerResponse> handleValidationException(
MethodArgumentNotValidException ex,
ServerWebExchange exchange
) {
ErrorResponse error = new ErrorResponse(
"VALIDATION_ERROR",
"Invalid request parameters",
exchange.getRequest().getPath().value()
);
return ServerResponse
.status(org.springframework.http.HttpStatus.BAD_REQUEST)
.bodyValue(error);
}
// 处理通用异常
@ExceptionHandler(Exception.class)
public Mono<ServerResponse> handleGenericException(
Exception ex,
ServerWebExchange exchange
) {
ErrorResponse error = new ErrorResponse(
"INTERNAL_SERVER_ERROR",
ex.getMessage(),
exchange.getRequest().getPath().value()
);
return ServerResponse
.status(org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(error);
}
}
// 错误响应类
class ErrorResponse {
private String code;
private String message;
private String path;
private long timestamp;
public ErrorResponse(String code, String message, String path) {
this.code = code;
this.message = message;
this.path = path;
this.timestamp = System.currentTimeMillis();
}
// getter 和 setter
public String getCode() { return code; }
public void setCode(String code) { this.code = code; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public String getPath() { return path; }
public void setPath(String path) { this.path = path; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}
3. 响应式数据访问
Spring Data R2DBC
- 响应式关系型数据库访问
- 完全非阻塞的数据库操作
- 支持主流关系型数据库(PostgreSQL、MySQL、H2 等)
R2DBC 实体类示例
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.data.relational.core.mapping.Column;
@Table("users")
public class User {
@Id
private Long id;
@Column("username")
private String username;
@Column("email")
private String email;
@Column("age")
private Integer age;
@Column("created_at")
private LocalDateTime createdAt;
// 构造器
public User() {}
public User(String username, String email, Integer age) {
this.username = username;
this.email = email;
this.age = age;
this.createdAt = LocalDateTime.now();
}
// getter 和 setter
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; }
public LocalDateTime getCreatedAt() { return createdAt; }
public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
}
R2DBC Repository 示例
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Repository
public interface UserRepository extends R2dbcRepository<User, Long> {
// 根据用户名查找用户
Mono<User> findByUsername(String username);
// 根据邮箱查找用户
Mono<User> findByEmail(String email);
// 根据年龄范围查找用户
Flux<User> findByAgeBetween(Integer minAge, Integer maxAge);
// 根据用户名模糊查询
Flux<User> findByUsernameContaining(String keyword);
// 检查用户名是否存在
Mono<Boolean> existsByUsername(String username);
// 统计指定年龄的用户数量
Mono<Long> countByAge(Integer age);
}
R2DBC Service 示例
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class UserService {
private final UserRepository userRepository;
private final TransactionalOperator rxtx;
public UserService(UserRepository userRepository, TransactionalOperator rxtx) {
this.userRepository = userRepository;
this.rxtx = rxtx;
}
// 获取所有用户
public Flux<User> findAll() {
return userRepository.findAll();
}
// 根据 ID 获取用户
public Mono<User> findById(Long id) {
return userRepository.findById(id);
}
// 保存用户
@Transactional
public Mono<User> save(User user) {
return userRepository.save(user);
}
// 批量保存用户
@Transactional
public Flux<User> saveAll(Flux<User> users) {
return userRepository.saveAll(users);
}
// 更新用户
@Transactional
public Mono<User> update(Long id, User user) {
return userRepository.findById(id)
.flatMap(existingUser -> {
existingUser.setUsername(user.getUsername());
existingUser.setEmail(user.getEmail());
existingUser.setAge(user.getAge());
return userRepository.save(existingUser);
});
}
// 删除用户
@Transactional
public Mono<Void> deleteById(Long id) {
return userRepository.deleteById(id);
}
// 根据用户名查找
public Mono<User> findByUsername(String username) {
return userRepository.findByUsername(username);
}
// 根据邮箱查找
public Mono<User> findByEmail(String email) {
return userRepository.findByEmail(email);
}
// 根据年龄范围查找
public Flux<User> findByAgeBetween(Integer minAge, Integer maxAge) {
return userRepository.findByAgeBetween(minAge, maxAge);
}
// 搜索用户
public Flux<User> search(String name, String email) {
if (name != null && email != null) {
return userRepository.findByUsernameContaining(name)
.filter(user -> user.getEmail().contains(email));
} else if (name != null) {
return userRepository.findByUsernameContaining(name);
} else if (email != null) {
return userRepository.findAll()
.filter(user -> user.getEmail().contains(email));
} else {
return userRepository.findAll();
}
}
// 分页查询
public Flux<User> findByPage(int page, int size) {
return userRepository.findAll()
.skip(page * size)
.take(size);
}
// 编程式事务示例
public Mono<Void> transferUser(Long fromId, Long toId) {
return userRepository.findById(fromId)
.flatMap(fromUser -> userRepository.findById(toId)
.flatMap(toUser -> {
// 执行业务逻辑
return userRepository.save(fromUser)
.then(userRepository.save(toUser));
})
)
.as(rxtx::transactional); // 使用事务操作符
}
}
R2DBC 配置示例
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.r2dbc.core.DatabaseClient;
@Configuration
public class R2dbcConfig extends AbstractR2dbcConfiguration {
@Override
protected String connectionUrl() {
return "r2dbc:postgresql://localhost:5432/reactor_db";
}
@Override
protected String username() {
return "postgres";
}
@Override
protected String password() {
return "password";
}
@Bean
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
return DatabaseClient.create(connectionFactory);
}
@Bean
public TransactionalOperator transactionalOperator(ConnectionFactory connectionFactory) {
return TransactionalOperator.create(R2dbcTransactionManager.create(connectionFactory));
}
}
响应式 NoSQL 支持
- Spring Data MongoDB 提供响应式模板和仓库
- Spring Data Redis 支持响应式操作
- Spring Data Cassandra 提供响应式驱动
MongoDB 响应式示例
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// MongoDB 文档类
@Document(collection = "products")
public class Product {
@Id
private String id;
private String name;
private Double price;
private String category;
private Integer stock;
// 构造器、getter 和 setter
public Product() {}
public Product(String name, Double price, String category, Integer stock) {
this.name = name;
this.price = price;
this.category = category;
this.stock = stock;
}
// getter 和 setter 方法
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public Double getPrice() { return price; }
public void setPrice(Double price) { this.price = price; }
public String getCategory() { return category; }
public void setCategory(String category) { this.category = category; }
public Integer getStock() { return stock; }
public void setStock(Integer stock) { this.stock = stock; }
}
// MongoDB Repository
@Repository
public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
// 根据分类查找产品
Flux<Product> findByCategory(String category);
// 根据价格范围查找产品
Flux<Product> findByPriceBetween(Double minPrice, Double maxPrice);
// 根据名称模糊查询
Flux<Product> findByNameContaining(String keyword);
// 查找库存不足的产品
Flux<Product> findByStockLessThan(Integer threshold);
}
// MongoDB Service
@Service
public class ProductService {
private final ProductRepository productRepository;
public ProductService(ProductRepository productRepository) {
this.productRepository = productRepository;
}
// 获取所有产品
public Flux<Product> findAll() {
return productRepository.findAll();
}
// 根据 ID 获取产品
public Mono<Product> findById(String id) {
return productRepository.findById(id);
}
// 保存产品
public Mono<Product> save(Product product) {
return productRepository.save(product);
}
// 根据分类查找
public Flux<Product> findByCategory(String category) {
return productRepository.findByCategory(category);
}
// 根据价格范围查找
public Flux<Product> findByPriceRange(Double minPrice, Double maxPrice) {
return productRepository.findByPriceBetween(minPrice, maxPrice);
}
}
Redis 响应式示例
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class RedisService {
private final ReactiveRedisTemplate<String, Object> redisTemplate;
private final ReactiveStringRedisTemplate stringRedisTemplate;
public RedisService(
ReactiveRedisTemplate<String, Object> redisTemplate,
ReactiveStringRedisTemplate stringRedisTemplate
) {
this.redisTemplate = redisTemplate;
this.stringRedisTemplate = stringRedisTemplate;
}
// 设置键值对
public Mono<Boolean> set(String key, Object value) {
return redisTemplate.opsForValue().set(key, value);
}
// 设置键值对并指定过期时间
public Mono<Boolean> set(String key, Object value, long timeout, java.util.concurrent.TimeUnit unit) {
return redisTemplate.opsForValue().set(key, value, timeout, unit);
}
// 获取值
public Mono<Object> get(String key) {
return redisTemplate.opsForValue().get(key);
}
// 删除键
public Mono<Boolean> delete(String key) {
return redisTemplate.delete(key).map(count -> count > 0);
}
// 检查键是否存在
public Mono<Boolean> exists(String key) {
return redisTemplate.hasKey(key);
}
// 设置过期时间
public Mono<Boolean> expire(String key, long timeout, java.util.concurrent.TimeUnit unit) {
return redisTemplate.expire(key, timeout, unit);
}
// 获取过期时间
public Mono<Long> getExpire(String key) {
return redisTemplate.getExpire(key);
}
// 列表操作 - 左推
public Mono<Long> lpush(String key, Object value) {
return redisTemplate.opsForList().leftPush(key, value);
}
// 列表操作 - 右推
public Mono<Long> rpush(String key, Object value) {
return redisTemplate.opsForList().rightPush(key, value);
}
// 列表操作 - 左弹
public Mono<Object> lpop(String key) {
return redisTemplate.opsForList().leftPop(key);
}
// 列表操作 - 范围查询
public Flux<Object> lrange(String key, long start, long end) {
return redisTemplate.opsForList().range(key, start, end);
}
// 哈希操作 - 设置字段
public Mono<Boolean> hset(String key, String field, Object value) {
return redisTemplate.opsForHash().put(key, field, value);
}
// 哈希操作 - 获取字段
public Mono<Object> hget(String key, String field) {
return redisTemplate.opsForHash().get(key, field);
}
// 哈希操作 - 获取所有字段
public Flux<Map.Entry<String, Object>> hgetall(String key) {
return redisTemplate.opsForHash().entries(key);
}
// 集合操作 - 添加成员
public Mono<Long> sadd(String key, Object value) {
return redisTemplate.opsForSet().add(key, value);
}
// 集合操作 - 获取所有成员
public Flux<Object> smembers(String key) {
return redisTemplate.opsForSet().members(key);
}
// 有序集合操作 - 添加成员
public Mono<Boolean> zadd(String key, Object value, double score) {
return redisTemplate.opsForZSet().add(key, value, score);
}
// 有序集合操作 - 范围查询
public Flux<Object> zrange(String key, long start, long end) {
return redisTemplate.opsForZSet().range(key, start, end);
}
// 发布订阅 - 发布消息
public Mono<Long> publish(String channel, String message) {
return stringRedisTemplate.convertAndSend(channel, message);
}
// 发布订阅 - 订阅频道
public Flux<String> subscribe(String channel) {
return stringRedisTemplate.listenToChannel(channel)
.map(message -> message.getMessage());
}
}
响应式事务管理
- 支持响应式事务(ReactiveTransactionManager)
- 与传统事务管理器兼容
- 支持编程式和声明式事务
响应式事务示例
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class TransactionService {
private final UserRepository userRepository;
private final OrderRepository orderRepository;
private final TransactionalOperator rxtx;
public TransactionService(
UserRepository userRepository,
OrderRepository orderRepository,
TransactionalOperator rxtx
) {
this.userRepository = userRepository;
this.orderRepository = orderRepository;
this.rxtx = rxtx;
}
// 声明式事务 - 使用 @Transactional 注解
@Transactional
public Mono<Void> createUserWithOrder(User user, Order order) {
return userRepository.save(user)
.flatMap(savedUser -> {
order.setUserId(savedUser.getId());
return orderRepository.save(order);
})
.then();
}
// 编程式事务 - 使用 TransactionalOperator
public Mono<Void> transferMoney(Long fromUserId, Long toUserId, Double amount) {
return userRepository.findById(fromUserId)
.flatMap(fromUser -> userRepository.findById(toUserId)
.flatMap(toUser -> {
// 扣款
fromUser.setBalance(fromUser.getBalance() - amount);
// 加款
toUser.setBalance(toUser.getBalance() + amount);
// 保存两个用户
return userRepository.save(fromUser)
.then(userRepository.save(toUser));
})
)
.as(rxtx::transactional) // 应用事务
.then();
}
// 嵌套事务
@Transactional
public Mono<Void> complexOperation(User user, Order order, Payment payment) {
return userRepository.save(user)
.flatMap(savedUser -> {
order.setUserId(savedUser.getId());
return orderRepository.save(order);
})
.flatMap(savedOrder -> {
payment.setOrderId(savedOrder.getId());
return processPayment(payment);
})
.then();
}
@Transactional
private Mono<Payment> processPayment(Payment payment) {
// 处理支付逻辑
return Mono.just(payment);
}
// 批量操作事务
@Transactional
public Flux<User> batchCreateUsers(Flux<User> users) {
return users.flatMap(userRepository::save);
}
}
4. 响应式安全特性
Spring Security 5.x 响应式支持
- 完全响应式的认证和授权
- 支持 JWT、OAuth2 等认证方式
- 响应式方法级安全控制
Spring Security 配置示例
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.server.SecurityWebFilterChain;
@Configuration
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class SecurityConfig {
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
return http
.authorizeExchange(exchanges -> exchanges
// 公开端点
.pathMatchers("/api/public/**", "/auth/**").permitAll()
// 管理员端点
.pathMatchers("/api/admin/**").hasRole("ADMIN")
// 用户端点
.pathMatchers("/api/users/**").hasAnyRole("USER", "ADMIN")
// 其他所有请求需要认证
.anyExchange().authenticated()
)
// 禁用 CSRF(对于 REST API)
.csrf(csrf -> csrf.disable())
// 配置 CORS
.cors(cors -> cors.disable())
// 配置异常处理
.exceptionHandling(exceptions -> exceptions
.authenticationEntryPoint((exchange, ex) -> {
exchange.getResponse().setStatusCode(org.springframework.http.HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
})
.accessDeniedHandler((exchange, ex) -> {
exchange.getResponse().setStatusCode(org.springframework.http.HttpStatus.FORBIDDEN);
return exchange.getResponse().setComplete();
})
)
.build();
}
@Bean
public MapReactiveUserDetailsService userDetailsService() {
// 在实际项目中,应该从数据库加载用户信息
User.UserBuilder userBuilder = User.withPasswordEncoder(passwordEncoder());
return new MapReactiveUserDetailsService(
userBuilder
.username("admin")
.password(passwordEncoder().encode("admin123"))
.roles("ADMIN")
.build(),
userBuilder
.username("user")
.password(passwordEncoder().encode("user123"))
.roles("USER")
.build()
);
}
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
}
JWT 认证示例
import org.springframework.security.authentication.ReactiveAuthenticationManager;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
public class JwtAuthenticationManager implements ReactiveAuthenticationManager {
private final JwtTokenProvider jwtTokenProvider;
public JwtAuthenticationManager(JwtTokenProvider jwtTokenProvider) {
this.jwtTokenProvider = jwtTokenProvider;
}
@Override
public Mono<Authentication> authenticate(Authentication authentication) {
String authToken = authentication.getCredentials().toString();
try {
String username = jwtTokenProvider.getUsernameFromToken(authToken);
if (username != null && jwtTokenProvider.validateToken(authToken)) {
// 从 JWT 中提取权限信息
var authorities = jwtTokenProvider.getAuthoritiesFromToken(authToken)
.stream()
.map(SimpleGrantedAuthority::new)
.toList();
return Mono.just(new UsernamePasswordAuthenticationToken(
username,
null,
authorities
));
}
} catch (Exception e) {
return Mono.error(e);
}
return Mono.empty();
}
}
// JWT Token 提供者
@Component
public class JwtTokenProvider {
private static final String SECRET_KEY = "your-secret-key";
private static final long EXPIRATION_TIME = 86400000; // 24 小时
// 生成 JWT Token
public String generateToken(Authentication authentication) {
String username = authentication.getName();
var authorities = authentication.getAuthorities()
.stream()
.map(GrantedAuthority::getAuthority)
.toList();
// 这里应该使用 JWT 库(如 jjwt)生成 token
// 简化示例
return "jwt-token-for-" + username;
}
// 从 Token 中获取用户名
public String getUsernameFromToken(String token) {
// 实际项目中应该解析 JWT token
if (token.startsWith("jwt-token-for-")) {
return token.substring("jwt-token-for-".length());
}
return null;
}
// 验证 Token
public boolean validateToken(String token) {
// 实际项目中应该验证 JWT token 的签名和过期时间
return token != null && token.startsWith("jwt-token-for-");
}
// 从 Token 中获取权限
public List<String> getAuthoritiesFromToken(String token) {
// 实际项目中应该从 JWT token 中解析权限
return List.of("ROLE_USER");
}
}
// JWT 认证过滤器
@Component
public class JwtAuthenticationFilter implements WebFilter {
private final JwtAuthenticationManager authenticationManager;
public JwtAuthenticationFilter(JwtAuthenticationManager authenticationManager) {
this.authenticationManager = authenticationManager;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String authHeader = exchange.getRequest().getHeaders().getFirst("Authorization");
if (authHeader != null && authHeader.startsWith("Bearer ")) {
String token = authHeader.substring(7);
return authenticationManager.authenticate(
new UsernamePasswordAuthenticationToken(null, token)
).flatMap(authentication -> {
// 将认证信息存储到安全上下文中
return chain.filter(exchange)
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
}).switchIfEmpty(chain.filter(exchange));
}
return chain.filter(exchange);
}
}
方法级安全示例
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class SecureUserService {
private final UserRepository userRepository;
public SecureUserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
// 只有 ADMIN 角色可以访问
@PreAuthorize("hasRole('ADMIN')")
public Flux<User> getAllUsers() {
return userRepository.findAll();
}
// 只有 USER 或 ADMIN 角色可以访问
@PreAuthorize("hasAnyRole('USER', 'ADMIN')")
public Mono<User> getUserById(Long id) {
return userRepository.findById(id);
}
// 用户只能访问自己的信息
@PreAuthorize("#id == authentication.principal.id or hasRole('ADMIN')")
public Mono<User> getOwnUser(Long id) {
return userRepository.findById(id);
}
// 只有 ADMIN 可以创建用户
@PreAuthorize("hasRole('ADMIN')")
public Mono<User> createUser(User user) {
return userRepository.save(user);
}
// 用户可以更新自己的信息
@PreAuthorize("#user.id == authentication.principal.id or hasRole('ADMIN')")
public Mono<User> updateUser(User user) {
return userRepository.save(user);
}
// 只有 ADMIN 可以删除用户
@PreAuthorize("hasRole('ADMIN')")
public Mono<Void> deleteUser(Long id) {
return userRepository.deleteById(id);
}
}
5. 响应式测试支持
WebTestClient
- 响应式 Web 应用测试工具
- 支持对 WebFlux 应用进行集成测试
- 提供流畅的 API 进行断言和验证
WebTestClient 示例
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.verify;
@WebFluxTest(UserController.class)
public class UserControllerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private UserService userService;
@Test
public void testGetAllUsers() {
// 准备测试数据
User user1 = new User(1L, "Alice", "alice@example.com", 25);
User user2 = new User(2L, "Bob", "bob@example.com", 30);
given(userService.findAll()).willReturn(Flux.just(user1, user2));
// 执行测试
webTestClient.get()
.uri("/api/users")
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class)
.hasSize(2)
.contains(user1, user2);
}
@Test
public void testGetUserById() {
User user = new User(1L, "Alice", "alice@example.com", 25);
given(userService.findById(1L)).willReturn(Mono.just(user));
webTestClient.get()
.uri("/api/users/1")
.exchange()
.expectStatus().isOk()
.expectBody(User.class)
.isEqualTo(user);
}
@Test
public void testGetUserByIdNotFound() {
given(userService.findById(999L)).willReturn(Mono.empty());
webTestClient.get()
.uri("/api/users/999")
.exchange()
.expectStatus().isNotFound();
}
@Test
public void testCreateUser() {
User newUser = new User(null, "Charlie", "charlie@example.com", 28);
User savedUser = new User(3L, "Charlie", "charlie@example.com", 28);
given(userService.save(newUser)).willReturn(Mono.just(savedUser));
webTestClient.post()
.uri("/api/users")
.bodyValue(newUser)
.exchange()
.expectStatus().isCreated()
.expectBody(User.class)
.isEqualTo(savedUser);
}
@Test
public void testUpdateUser() {
User updatedUser = new User(1L, "Alice Updated", "alice.new@example.com", 26);
given(userService.update(1L, updatedUser)).willReturn(Mono.just(updatedUser));
webTestClient.put()
.uri("/api/users/1")
.bodyValue(updatedUser)
.exchange()
.expectStatus().isOk()
.expectBody(User.class)
.isEqualTo(updatedUser);
}
@Test
public void testDeleteUser() {
given(userService.deleteById(1L)).willReturn(Mono.empty());
webTestClient.delete()
.uri("/api/users/1")
.exchange()
.expectStatus().isNoContent();
verify(userService).deleteById(1L);
}
@Test
public void testSearchUsers() {
User user1 = new User(1L, "Alice", "alice@example.com", 25);
User user2 = new User(2L, "Alice Smith", "alice.smith@example.com", 30);
given(userService.search("Alice", null))
.willReturn(Flux.just(user1, user2));
webTestClient.get()
.uri(uriBuilder -> uriBuilder
.path("/api/users/search")
.queryParam("name", "Alice")
.build())
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class)
.hasSize(2);
}
}
StepVerifier
- 用于测试 Reactor 流的测试工具
- 支持对 Mono 和 Flux 进行精确测试
- 提供时间虚拟化等高级测试功能
StepVerifier 示例
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.scheduler.VirtualTimeScheduler;
import java.time.Duration;
public class ReactorTest {
@Test
public void testMonoJust() {
Mono<String> mono = Mono.just("Hello, Reactor!");
StepVerifier.create(mono)
.expectNext("Hello, Reactor!")
.expectComplete()
.verify();
}
@Test
public void testMonoError() {
Mono<String> mono = Mono.error(new RuntimeException("Test error"));
StepVerifier.create(mono)
.expectError(RuntimeException.class)
.verify();
}
@Test
public void testFluxJust() {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.expectComplete()
.verify();
}
@Test
public void testFluxRange() {
Flux<Integer> flux = Flux.range(1, 10);
StepVerifier.create(flux)
.expectNextCount(10)
.expectComplete()
.verify();
}
@Test
public void testFluxMap() {
Flux<Integer> flux = Flux.range(1, 5)
.map(i -> i * 2);
StepVerifier.create(flux)
.expectNext(2, 4, 6, 8, 10)
.expectComplete()
.verify();
}
@Test
public void testFluxFilter() {
Flux<Integer> flux = Flux.range(1, 10)
.filter(i -> i % 2 == 0);
StepVerifier.create(flux)
.expectNext(2, 4, 6, 8, 10)
.expectComplete()
.verify();
}
@Test
public void testFluxError() {
Flux<String> flux = Flux.just("A", "B", "C")
.map(s -> {
if (s.equals("B")) {
throw new RuntimeException("Error on B");
}
return s;
});
StepVerifier.create(flux)
.expectNext("A")
.expectError(RuntimeException.class)
.verify();
}
@Test
public void testFluxOnErrorResume() {
Flux<String> flux = Flux.just("A", "B", "C")
.map(s -> {
if (s.equals("B")) {
throw new RuntimeException("Error on B");
}
return s;
})
.onErrorResume(error -> Flux.just("Fallback"));
StepVerifier.create(flux)
.expectNext("A")
.expectNext("Fallback")
.expectComplete()
.verify();
}
@Test
public void testFluxCollectList() {
Mono<List<Integer>> mono = Flux.range(1, 5)
.collectList();
StepVerifier.create(mono)
.expectNextMatches(list -> list.size() == 5)
.expectComplete()
.verify();
}
@Test
public void testFluxReduce() {
Mono<Integer> mono = Flux.range(1, 5)
.reduce((a, b) -> a + b);
StepVerifier.create(mono)
.expectNext(15)
.expectComplete()
.verify();
}
@Test
public void testFluxBuffer() {
Flux<List<Integer>> flux = Flux.range(1, 10)
.buffer(3);
StepVerifier.create(flux)
.expectNextMatches(list -> list.equals(List.of(1, 2, 3)))
.expectNextMatches(list -> list.equals(List.of(4, 5, 6)))
.expectNextMatches(list -> list.equals(List.of(7, 8, 9)))
.expectNextMatches(list -> list.equals(List.of(10)))
.expectComplete()
.verify();
}
@Test
public void testFluxDelayElements() {
// 使用虚拟时间进行测试
VirtualTimeScheduler.getOrSet();
try {
Flux<Long> flux = Flux.interval(Duration.ofMillis(100))
.take(3);
StepVerifier.withVirtualTime(() -> flux)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(100))
.expectNext(0L)
.expectNoEvent(Duration.ofMillis(100))
.expectNext(1L)
.expectNoEvent(Duration.ofMillis(100))
.expectNext(2L)
.expectComplete()
.verify();
} finally {
VirtualTimeScheduler.reset();
}
}
@Test
public void testMonoTimeout() {
Mono<String> mono = Mono.fromSupplier(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Delayed result";
}).timeout(Duration.ofMillis(1000), Mono.just("Timeout fallback"));
StepVerifier.create(mono)
.expectNext("Timeout fallback")
.expectComplete()
.verify();
}
@Test
public void testFluxMerge() {
Flux<Integer> flux1 = Flux.range(1, 3);
Flux<Integer> flux2 = Flux.range(4, 3);
Flux<Integer> merged = Flux.merge(flux1, flux2);
StepVerifier.create(merged)
.expectNext(1, 2, 3, 4, 5, 6)
.expectComplete()
.verify();
}
@Test
public void testFluxZip() {
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<Integer> flux2 = Flux.just(1, 2, 3);
Flux<String> zipped = Flux.zip(flux1, flux2)
.map(tuple -> tuple.getT1() + "-" + tuple.getT2());
StepVerifier.create(zipped)
.expectNext("A-1", "B-2", "C-3")
.expectComplete()
.verify();
}
@Test
public void testFluxDistinct() {
Flux<Integer> flux = Flux.just(1, 2, 2, 3, 3, 3, 4)
.distinct();
StepVerifier.create(flux)
.expectNext(1, 2, 3, 4)
.expectComplete()
.verify();
}
@Test
public void testFluxTakeAndSkip() {
Flux<Integer> flux = Flux.range(1, 10)
.skip(2)
.take(5);
StepVerifier.create(flux)
.expectNext(3, 4, 5, 6, 7)
.expectComplete()
.verify();
}
}
6. 响应式编程优势
资源利用率
- 少量固定线程处理大量并发请求
- 减少线程上下文切换开销
- 提高系统吞吐量和可伸缩性
弹性和容错
- 内置超时、重试、熔断等机制
- 优雅的错误处理和降级策略
- 更好的系统弹性
7. 响应式编程模型对比
graph TD
A[传统阻塞模型] --> B[每个请求一个线程]
C[响应式非阻塞模型] --> D[少量线程处理大量请求]
B --> E[线程等待 I/O]
B --> F[资源消耗大]
B --> G[并发能力受限]
D --> H[事件驱动]
D --> I[资源消耗小]
D --> J[高并发能力]
8. 响应式编程适用场景
- 高并发、低延迟的应用
- 流式数据处理应用
- 实时通信应用(如聊天、推送)
- 微服务架构中的服务间通信
- I/O 密集型应用
9. 响应式高级特性
响应式 Kafka
- 完全非阻塞的 Kafka 生产者和消费者
- 支持背压和流控制
- 与 Spring Boot 无缝集成
响应式 Kafka 配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.SenderOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class ReactiveKafkaConfig {
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<String, String> options = ReceiverOptions.create(props);
return options.subscription(Collections.singletonList("test-topic"));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate(
ReceiverOptions<String, String> receiverOptions
) {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
@Bean
public SenderOptions<String, String> kafkaSenderOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
return SenderOptions.create(props);
}
@Bean
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate(
SenderOptions<String, String> senderOptions
) {
return new ReactiveKafkaProducerTemplate<>(senderOptions);
}
}
响应式 Kafka 消费者
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.ReceiverRecord;
@Service
public class KafkaConsumerService {
private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate;
public KafkaConsumerService(
ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate
) {
this.kafkaConsumerTemplate = kafkaConsumerTemplate;
}
// 消费消息
public Flux<String> consumeMessages() {
return kafkaConsumerTemplate.receive()
.doOnNext(record -> {
System.out.println("Received message: " + record.value());
System.out.println("Partition: " + record.partition());
System.out.println("Offset: " + record.offset());
})
.doOnError(error -> System.err.println("Error consuming message: " + error))
.map(ReceiverRecord::value);
}
// 消费并处理消息
public Flux<String> consumeAndProcessMessages() {
return kafkaConsumerTemplate.receive()
.flatMap(record -> {
// 处理消息
String processedMessage = processMessage(record.value());
// 手动确认
return record.receiverOffset()
.commit()
.thenReturn(processedMessage);
})
.doOnNext(message -> System.out.println("Processed: " + message))
.doOnError(error -> System.err.println("Error processing message: " + error));
}
// 批量消费消息
public Flux<List<String>> consumeMessagesBatch(int batchSize) {
return kafkaConsumerTemplate.receive()
.map(ReceiverRecord::value)
.buffer(batchSize)
.doOnNext(batch -> System.out.println("Processing batch of " + batch.size() + " messages"));
}
private String processMessage(String message) {
// 模拟消息处理逻辑
return "Processed: " + message.toUpperCase();
}
}
响应式 Kafka 生产者
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@Service
public class KafkaProducerService {
private final ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate;
public KafkaProducerService(
ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate
) {
this.kafkaProducerTemplate = kafkaProducerTemplate;
}
// 发送单条消息
public Mono<Void> sendMessage(String topic, String key, String value) {
return kafkaProducerTemplate.send(topic, key, value)
.doOnSuccess(result -> {
System.out.println("Message sent successfully");
System.out.println("Topic: " + result.recordMetadata().topic());
System.out.println("Partition: " + result.recordMetadata().partition());
System.out.println("Offset: " + result.recordMetadata().offset());
})
.doOnError(error -> System.err.println("Error sending message: " + error))
.then();
}
// 发送消息并获取结果
public Mono<SenderResult<String>> sendMessageWithResult(String topic, String key, String value) {
return kafkaProducerTemplate.send(topic, key, value)
.doOnSuccess(result -> System.out.println("Message sent: " + value))
.doOnError(error -> System.err.println("Error sending message: " + error));
}
// 批量发送消息
public Flux<SenderResult<String>> sendMessagesBatch(
String topic,
List<String> messages
) {
return Flux.fromIterable(messages)
.flatMap(message ->
kafkaProducerTemplate.send(SenderRecord.create(
new org.apache.kafka.clients.producer.ProducerRecord<>(topic, null, message),
message
))
)
.doOnNext(result ->
System.out.println("Sent message: " + result.correlationMetadata())
)
.doOnError(error ->
System.err.println("Error sending batch: " + error)
);
}
// 发送带有时间戳的消息
public Mono<Void> sendMessageWithTimestamp(
String topic,
String key,
String value,
long timestamp
) {
return kafkaProducerTemplate.send(
new org.apache.kafka.clients.producer.ProducerRecord<>(
topic,
null,
timestamp,
key,
value
)
).then();
}
// 发送消息到指定分区
public Mono<Void> sendMessageToPartition(
String topic,
int partition,
String key,
String value
) {
return kafkaProducerTemplate.send(
new org.apache.kafka.clients.producer.ProducerRecord<>(
topic,
partition,
key,
value
)
).then();
}
}
响应式 Kafka 监听器
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.ReceiverRecord;
import jakarta.annotation.PostConstruct;
@Component
public class KafkaMessageListener {
private final KafkaConsumerService kafkaConsumerService;
private final KafkaProducerService kafkaProducerService;
public KafkaMessageListener(
KafkaConsumerService kafkaConsumerService,
KafkaProducerService kafkaProducerService
) {
this.kafkaConsumerService = kafkaConsumerService;
this.kafkaProducerService = kafkaProducerService;
}
@PostConstruct
public void startListening() {
kafkaConsumerService.consumeAndProcessMessages()
.subscribe(
message -> System.out.println("Processed message: " + message),
error -> System.err.println("Error in listener: " + error),
() -> System.out.println("Listener completed")
);
}
// 请求-响应模式
public Flux<String> requestResponsePattern(String requestTopic, String responseTopic) {
return kafkaConsumerService.consumeMessages()
.flatMap(request -> {
// 处理请求
String response = "Response to: " + request;
// 发送响应
return kafkaProducerService.sendMessage(responseTopic, null, response)
.thenReturn(response);
});
}
}
响应式 WebSocket
- 完全非阻塞的 WebSocket 通信
- 支持双向实时通信
- 与 Spring WebFlux 无缝集成
WebSocket 配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketHandlerMapping(WebSocketHandler echoHandler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/echo", echoHandler);
map.put("/ws/chat", chatHandler());
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Bean
public WebSocketHandler echoHandler() {
return session -> session.send(
session.receive()
.map(message -> {
// 回显消息
System.out.println("Received: " + message.getPayloadAsText());
return session.textMessage("Echo: " + message.getPayloadAsText());
})
);
}
@Bean
public WebSocketHandler chatHandler() {
return session -> {
// 广播消息给所有连接的客户端
return session.send(
session.receive()
.map(message -> {
String payload = message.getPayloadAsText();
System.out.println("Chat message: " + payload);
return session.textMessage("Broadcast: " + payload);
})
);
};
}
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
WebSocket 服务
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class WebSocketService {
private final Sinks.Many<String> messageSink = Sinks.many().multicast().onBackpressureBuffer();
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
// 添加会话
public void addSession(String sessionId, WebSocketSession session) {
sessions.put(sessionId, session);
System.out.println("Session added: " + sessionId);
}
// 移除会话
public void removeSession(String sessionId) {
sessions.remove(sessionId);
System.out.println("Session removed: " + sessionId);
}
// 广播消息给所有会话
public Mono<Void> broadcast(String message) {
messageSink.tryEmitNext(message);
return Flux.fromIterable(sessions.values())
.flatMap(session ->
session.send(Mono.just(session.textMessage(message)))
)
.then();
}
// 发送消息给指定会话
public Mono<Void> sendToSession(String sessionId, String message) {
WebSocketSession session = sessions.get(sessionId);
if (session != null) {
return session.send(Mono.just(session.textMessage(message)));
}
return Mono.empty();
}
// 发送心跳
public Flux<String> heartbeat() {
return Flux.interval(Duration.ofSeconds(30))
.map(tick -> "Heartbeat: " + System.currentTimeMillis());
}
// 处理聊天消息
public Mono<Void> handleChatMessage(WebSocketSession session, String message) {
System.out.println("Chat message from " + session.getId() + ": " + message);
// 广播消息给所有其他会话
return Flux.fromIterable(sessions.values())
.filter(s -> !s.getId().equals(session.getId()))
.flatMap(s ->
s.send(Mono.just(s.textMessage(
session.getId() + ": " + message
)))
)
.then();
}
}
WebSocket 处理器
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;
@Component
public class ChatWebSocketHandler implements WebSocketHandler {
private final WebSocketService webSocketService;
public ChatWebSocketHandler(WebSocketService webSocketService) {
this.webSocketService = webSocketService;
}
@Override
public Mono<Void> handle(WebSocketSession session) {
String sessionId = session.getId();
webSocketService.addSession(sessionId, session);
return session.send(
session.receive()
.doOnNext(message -> {
String payload = message.getPayloadAsText();
System.out.println("Received from " + sessionId + ": " + payload);
// 处理消息
webSocketService.handleChatMessage(session, payload).subscribe();
})
.map(message -> {
// 回显消息
return session.textMessage("You: " + message.getPayloadAsText());
})
).doFinally(signalType -> {
// 会话结束时清理
webSocketService.removeSession(sessionId);
});
}
}
响应式 HTTP 客户端
- 非阻塞的 HTTP 客户端
- 支持流式响应
- 与 WebClient 无缝集成
WebClient 配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
// 配置 HttpClient
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofSeconds(5))
.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS))
.addHandlerLast(new WriteTimeoutHandler(10, TimeUnit.SECONDS))
);
// 创建 WebClient
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl("https://api.example.com")
.defaultHeader("Content-Type", "application/json")
.build();
}
@Bean
public WebClient webClientWithRetry() {
HttpClient httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(5));
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl("https://api.example.com")
.build();
}
}
WebClient 服务
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
@Service
public class ApiClientService {
private final WebClient webClient;
public ApiClientService(WebClient webClient) {
this.webClient = webClient;
}
// GET 请求
public Mono<User> getUser(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.doOnSuccess(user -> System.out.println("User fetched: " + user.getName()))
.doOnError(error -> System.err.println("Error fetching user: " + error));
}
// POST 请求
public Mono<User> createUser(User user) {
return webClient.post()
.uri("/users")
.bodyValue(user)
.retrieve()
.bodyToMono(User.class)
.doOnSuccess(createdUser ->
System.out.println("User created: " + createdUser.getId())
);
}
// PUT 请求
public Mono<User> updateUser(Long id, User user) {
return webClient.put()
.uri("/users/{id}", id)
.bodyValue(user)
.retrieve()
.bodyToMono(User.class);
}
// DELETE 请求
public Mono<Void> deleteUser(Long id) {
return webClient.delete()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(Void.class);
}
// 流式响应
public Flux<User> getAllUsers() {
return webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class)
.doOnNext(user -> System.out.println("User: " + user.getName()));
}
// 带重试的请求
public Mono<User> getUserWithRetry(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable ->
throwable instanceof WebClientResponseException &&
((WebClientResponseException) throwable).getStatusCode().is5xxServerError()
)
);
}
// 带超时的请求
public Mono<User> getUserWithTimeout(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.timeout(Duration.ofSeconds(3));
}
// 批量请求
public Flux<User> batchGetUsers(List<Long> ids) {
return Flux.fromIterable(ids)
.flatMap(id ->
webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
);
}
// 并发请求
public Mono<List<User>> concurrentGetUsers(List<Long> ids) {
return Flux.fromIterable(ids)
.flatMap(id ->
webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
)
.collectList();
}
// 错误处理
public Mono<User> getUserWithErrorHandling(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.onStatus(
status -> status.is4xxClientError(),
response -> Mono.error(new RuntimeException("Client error"))
)
.onStatus(
status -> status.is5xxServerError(),
response -> Mono.error(new RuntimeException("Server error"))
)
.bodyToMono(User.class)
.onErrorResume(WebClientResponseException.class, ex -> {
System.err.println("HTTP Error: " + ex.getStatusCode());
return Mono.empty();
})
.onErrorResume(Exception.class, ex -> {
System.err.println("Error: " + ex.getMessage());
return Mono.empty();
});
}
// 文件上传
public Mono<String> uploadFile(String fileName, byte[] fileContent) {
return webClient.post()
.uri("/upload")
.bodyValue(fileContent)
.retrieve()
.bodyToMono(String.class);
}
// 文件下载
public Mono<byte[]> downloadFile(String fileId) {
return webClient.get()
.uri("/files/{id}", fileId)
.retrieve()
.bodyToMono(byte[].class);
}
}
响应式缓存
- 非阻塞的缓存操作
- 支持 Redis、Caffeine 等缓存实现
- 与 Spring Cache 无缝集成
响应式缓存配置
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.ReactiveRedisCacheConfiguration;
import org.springframework.data.redis.cache.ReactiveRedisCacheManager;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
@Configuration
@EnableCaching
public class ReactiveCacheConfig {
@Bean
public ReactiveRedisCacheManager cacheManager(
ReactiveRedisConnectionFactory connectionFactory
) {
RedisSerializationContext.SerializationPair<Object> jsonSerializer =
RedisSerializationContext.SerializationPair.fromSerializer(
new GenericJackson2JsonRedisSerializer()
);
return ReactiveRedisCacheManager.builder(connectionFactory)
.cacheDefaults(
ReactiveRedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(10))
.disableCachingNullValues()
.serializeValuesWith(jsonSerializer)
)
.build();
}
}
响应式缓存服务
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class CacheService {
private final UserRepository userRepository;
public CacheService(UserRepository userRepository) {
this.userRepository = userRepository;
}
// 缓存读取
@Cacheable(value = "users", key = "#id")
public Mono<User> getUserById(Long id) {
System.out.println("Fetching user from database: " + id);
return userRepository.findById(id);
}
// 缓存更新
@CachePut(value = "users", key = "#user.id")
public Mono<User> updateUser(User user) {
System.out.println("Updating user in database: " + user.getId());
return userRepository.save(user);
}
// 缓存清除
@CacheEvict(value = "users", key = "#id")
public Mono<Void> deleteUser(Long id) {
System.out.println("Deleting user from database: " + id);
return userRepository.deleteById(id);
}
// 清除所有缓存
@CacheEvict(value = "users", allEntries = true)
public Mono<Void> clearAllCache() {
System.out.println("Clearing all user cache");
return Mono.empty();
}
// 批量缓存读取
@Cacheable(value = "users", key = "'all'")
public Flux<User> getAllUsers() {
System.out.println("Fetching all users from database");
return userRepository.findAll();
}
// 条件缓存
@Cacheable(value = "users", key = "#id", unless = "#result == null")
public Mono<User> getUserByIdConditional(Long id) {
return userRepository.findById(id);
}
}
Spring 的响应式编程特性提供了一套完整的工具链,使开发者能够构建高效、可伸缩的响应式应用程序,同时保持了 Spring 框架一贯的开发便利性。
10. 性能对比
传统阻塞模型 vs 响应式非阻塞模型
吞吐量对比
| 指标 | 传统阻塞模型 | 响应式非阻塞模型 |
|---|---|---|
| 单机并发连接数 | ~200-500 | ~10,000-100,000 |
| 请求响应时间 | 100-500ms | 10-50ms |
| CPU 利用率 | 30-50%(线程阻塞) | 70-90%(持续处理) |
| 内存占用 | 高(每个线程栈 ~1MB) | 低(事件循环栈 ~256KB) |
| 线程上下文切换 | 频繁 | 极少 |
资源利用率对比
graph LR
A[传统阻塞模型] --> B[线程池: 200-500 线程]
C[响应式非阻塞模型] --> D[事件循环: 少量线程]
B --> E[内存: 200-500MB]
B --> F[CPU: 30-50%]
B --> G[并发: 200-500]
D --> H[内存: 10-50MB]
D --> I[CPU: 70-90%]
D --> J[并发: 10,000+]
性能基准测试示例
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@SpringBootApplication
public class PerformanceComparisonApp {
public static void main(String[] args) {
SpringApplication.run(PerformanceComparisonApp.class, args);
}
}
@RestController
class PerformanceController {
private final ExecutorService executorService = Executors.newFixedThreadPool(200);
// 传统阻塞端点
@GetMapping("/blocking")
public Mono<String> blockingEndpoint() {
return Mono.fromCallable(() -> {
try {
Thread.sleep(100); // 模拟 I/O 操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Blocking response";
}).subscribeOn(Schedulers.boundedElastic());
}
// 响应式非阻塞端点
@GetMapping("/reactive")
public Mono<String> reactiveEndpoint() {
return Mono.fromCallable(() -> {
try {
Thread.sleep(100); // 模拟 I/O 操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Reactive response";
}).subscribeOn(Schedulers.boundedElastic());
}
// 批量处理对比
@GetMapping("/batch-blocking")
public Flux<String> batchBlocking() {
return Flux.range(1, 100)
.flatMap(i -> Mono.fromCallable(() -> {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Item " + i;
}).subscribeOn(Schedulers.boundedElastic()));
}
// 响应式批量处理
@GetMapping("/batch-reactive")
public Flux<String> batchReactive() {
return Flux.range(1, 100)
.flatMap(i -> Mono.fromCallable(() -> {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Item " + i;
}).subscribeOn(Schedulers.parallel()));
}
}
性能优化建议
1. 线程池优化
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.Executors;
// 使用自定义线程池
ExecutorService customExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2
);
// CPU 密集型任务使用 parallel
Flux.range(1, 1000)
.parallel()
.runOn(Schedulers.parallel())
.map(i -> heavyComputation(i))
.sequential()
.subscribe();
// I/O 密集型任务使用 boundedElastic
Flux.range(1, 1000)
.flatMap(i -> Mono.fromCallable(() -> ioOperation(i))
.subscribeOn(Schedulers.boundedElastic())
)
.subscribe();
// 单线程执行使用 single
Flux.range(1, 1000)
.publishOn(Schedulers.single())
.map(i -> i * 2)
.subscribe();
2. 连接池配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import java.time.Duration;
@Configuration
public class WebClientPoolConfig {
@Bean
public WebClient optimizedWebClient() {
// 配置连接池
ConnectionProvider provider = ConnectionProvider.builder("custom")
.maxConnections(500) // 最大连接数
.maxIdleTime(Duration.ofSeconds(20)) // 最大空闲时间
.maxLifeTime(Duration.ofSeconds(60)) // 连接最大生命周期
.pendingAcquireTimeout(Duration.ofSeconds(60)) // 获取连接超时
.evictInBackground(Duration.ofSeconds(120)) // 后台清理间隔
.build();
HttpClient httpClient = HttpClient.create(provider)
.responseTimeout(Duration.ofSeconds(5))
.keepAlive(true);
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl("https://api.example.com")
.build();
}
}
3. 数据库连接优化
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
@Configuration
public class R2dbcPoolConfig extends AbstractR2dbcConfiguration {
@Override
protected String connectionUrl() {
// 配置连接池参数
return "r2dbc:postgresql://localhost:5432/reactor_db" +
"?poolSize=50" + // 连接池大小
"&maxIdleTime=PT30S" + // 最大空闲时间
"&maxLifeTime=PT10M" + // 连接最大生命周期
"&acquireRetryAttempts=3"; // 获取连接重试次数
}
@Override
protected String username() {
return "postgres";
}
@Override
protected String password() {
return "password";
}
}
11. 最佳实践
1. 错误处理最佳实践
错误恢复策略
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
public class ErrorHandlingBestPractices {
// 1. 使用 onErrorResume 提供降级方案
public Mono<String> getUserWithFallback(Long id) {
return userRepository.findById(id)
.onErrorResume(UserNotFoundException.class, ex -> {
// 特定异常的降级处理
return Mono.just("Default User");
})
.onErrorResume(Exception.class, ex -> {
// 通用异常的降级处理
return Mono.just("Error: " + ex.getMessage());
});
}
// 2. 使用 retry 进行智能重试
public Mono<String> getUserWithRetry(Long id) {
return userRepository.findById(id)
.retryWhen(Retry.backoff(3, Duration.ofMillis(100))
.filter(throwable ->
throwable instanceof TimeoutException ||
throwable instanceof NetworkException
)
.doBeforeRetry(signal ->
System.out.println("Retry attempt: " + signal.totalRetries())
)
);
}
// 3. 使用 onErrorReturn 返回默认值
public Flux<String> getAllUsersWithDefault() {
return userRepository.findAll()
.onErrorReturn("Error", "User1", "User2");
}
// 4. 使用 doOnError 记录错误
public Mono<User> getUserWithLogging(Long id) {
return userRepository.findById(id)
.doOnError(error ->
logger.error("Failed to fetch user {}: {}", id, error.getMessage())
);
}
// 5. 使用 switchIfEmpty 处理空结果
public Mono<User> getUserOrThrow(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found")));
}
}
2. 背压处理最佳实践
import reactor.core.publisher.Flux;
import java.time.Duration;
public class BackpressureBestPractices {
// 1. 使用 onBackpressureBuffer 适用于可以缓冲的场景
public Flux<String> processWithBuffer() {
return Flux.interval(Duration.ofMillis(10))
.onBackpressureBuffer(100) // 缓冲 100 个元素
.flatMap(this::processItem)
.subscribeOn(Schedulers.boundedElastic());
}
// 2. 使用 onBackpressureDrop 适用于可以丢失数据的场景
public Flux<String> processWithDrop() {
return Flux.interval(Duration.ofMillis(10))
.onBackpressureDrop(dropped ->
logger.warn("Dropped item: {}", dropped)
)
.flatMap(this::processItem);
}
// 3. 使用 onBackpressureLatest 适用于只需要最新值的场景
public Flux<String> processWithLatest() {
return Flux.interval(Duration.ofMillis(10))
.onBackpressureLatest()
.flatMap(this::processItem);
}
// 4. 使用 limitRate 控制处理速率
public Flux<String> processWithRateLimit() {
return Flux.interval(Duration.ofMillis(10))
.limitRate(100) // 每秒最多处理 100 个元素
.flatMap(this::processItem);
}
// 5. 使用 sample 降频处理
public Flux<String> processWithSampling() {
return Flux.interval(Duration.ofMillis(10))
.sample(Duration.ofMillis(100)) // 每 100ms 取一个样本
.flatMap(this::processItem);
}
}
3. 资源管理最佳实践
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import java.sql.Connection;
import java.sql.DriverManager;
public class ResourceManagementBestPractices {
// 1. 使用 using 确保资源释放
public Flux<String> readFileWithResourceManagement(String filePath) {
return Mono.using(
// 资源提供者
() -> {
try {
return DriverManager.getConnection("jdbc:postgresql://localhost:5432/db");
} catch (Exception e) {
throw new RuntimeException(e);
}
},
// 资源消费者
connection -> {
// 使用连接执行操作
return Mono.fromCallable(() -> {
// 执行查询
return "Data from database";
}).subscribeOn(Schedulers.boundedElastic());
},
// 资源清理
connection -> {
try {
if (connection != null && !connection.isClosed()) {
connection.close();
}
} catch (Exception e) {
logger.error("Error closing connection", e);
}
}
);
}
// 2. 使用 doFinally 确保清理
public Mono<Void> processWithCleanup() {
return Flux.range(1, 10)
.flatMap(i -> processItem(i))
.doFinally(signalType -> {
// 无论成功或失败都执行清理
logger.info("Processing completed with signal: {}", signalType);
cleanupResources();
})
.then();
}
// 3. 使用 doOnCancel 处理取消
public Flux<String> processWithCancelHandling() {
return Flux.interval(Duration.ofMillis(100))
.doOnCancel(() -> {
logger.info("Stream was cancelled");
cleanupResources();
})
.flatMap(this::processItem);
}
// 4. 使用 doOnDiscard 处理丢弃的元素
public Flux<String> processWithDiscardHandling() {
return Flux.interval(Duration.ofMillis(100))
.onBackpressureDrop(dropped ->
logger.warn("Item was dropped: {}", dropped)
)
.doOnDiscard(discarded ->
logger.info("Item was discarded: {}", discarded)
);
}
private void cleanupResources() {
// 清理资源的逻辑
}
}
4. 性能优化最佳实践
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Schedulers;
import java.time.Duration;
public class PerformanceOptimizationBestPractices {
// 1. 使用 parallel 进行并行处理
public Flux<Integer> parallelProcessing() {
return Flux.range(1, 1000)
.parallel() // 启用并行处理
.runOn(Schedulers.parallel()) // 在并行调度器上执行
.map(this::heavyComputation)
.sequential(); // 按顺序收集结果
}
// 2. 使用 flatMap 并发处理
public Flux<String> concurrentProcessing() {
return Flux.range(1, 100)
.flatMap(i ->
Mono.fromCallable(() -> processItem(i))
.subscribeOn(Schedulers.boundedElastic())
); // 默认并发度为 256
}
// 3. 使用 flatMapSequential 保持顺序
public Flux<String> orderedConcurrentProcessing() {
return Flux.range(1, 100)
.flatMapSequential(i ->
Mono.fromCallable(() -> processItem(i))
.subscribeOn(Schedulers.boundedElastic())
); // 保持顺序的并发处理
}
// 4. 使用 cache 缓存结果
public Mono<String> getCachedData(String key) {
return fetchDataFromDatabase(key)
.cache(); // 缓存结果,多次订阅只执行一次
}
// 5. 使用 cacheInvalidateWith 缓存带失效
public Flux<String> getStreamWithCache(Duration ttl) {
return Flux.interval(Duration.ofSeconds(1))
.cacheInvalidateWith(Duration.ofMinutes(5), () -> {
// 每 5 分钟失效缓存
return Flux.interval(Duration.ofSeconds(1));
});
}
// 6. 使用 delayElements 控制发射速率
public Flux<Long> rateLimitedStream() {
return Flux.interval(Duration.ofMillis(10))
.delayElements(Duration.ofMillis(50)); // 限制发射速率为 20 个/秒
}
// 7. 使用 take 避免无限流
public Flux<Long> limitedStream() {
return Flux.interval(Duration.ofMillis(100))
.take(100); // 只取前 100 个元素
}
// 8. 使用 timeout 避免长时间阻塞
public Mono<String> withTimeout() {
return Mono.fromCallable(() -> {
Thread.sleep(5000);
return "Result";
}).timeout(Duration.ofSeconds(3), Mono.just("Timeout"));
}
}
5. 测试最佳实践
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.scheduler.VirtualTimeScheduler;
import java.time.Duration;
public class TestingBestPractices {
// 1. 使用 StepVerifier 进行精确测试
@Test
public void testMonoWithStepVerifier() {
Mono<String> mono = Mono.just("Test");
StepVerifier.create(mono)
.expectNext("Test")
.expectComplete()
.verify();
}
// 2. 使用虚拟时间测试时间相关操作
@Test
public void testWithVirtualTime() {
VirtualTimeScheduler.getOrSet();
try {
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1))
.take(3);
StepVerifier.withVirtualTime(() -> flux)
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(0L)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(1L)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(2L)
.expectComplete()
.verify();
} finally {
VirtualTimeScheduler.reset();
}
}
// 3. 测试错误场景
@Test
public void testErrorHandling() {
Mono<String> mono = Mono.error(new RuntimeException("Test error"));
StepVerifier.create(mono)
.expectError(RuntimeException.class)
.verify();
}
// 4. 测试背压
@Test
public void testBackpressure() {
Flux<Integer> flux = Flux.range(1, 1000);
StepVerifier.create(flux, 10) // 请求 10 个元素
.expectNextCount(10)
.expectComplete()
.verify();
}
}
6. 监控和日志最佳实践
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
public class MonitoringAndLoggingBestPractices {
private static final Logger logger = LoggerFactory.getLogger(MonitoringAndLoggingBestPractices.class);
// 1. 使用 doOnNext 记录处理进度
public Flux<String> processWithProgressLogging() {
return Flux.range(1, 100)
.doOnNext(i -> {
if (i % 10 == 0) {
logger.info("Processed {} items", i);
}
})
.flatMap(this::processItem);
}
// 2. 使用 doOnComplete 记录完成状态
public Mono<Void> processWithCompletionLogging() {
return Flux.range(1, 100)
.flatMap(this::processItem)
.doOnComplete(() -> {
logger.info("Processing completed successfully");
})
.then();
}
// 3. 使用 doOnError 记录错误
public Mono<String> processWithErrorLogging() {
return Mono.fromCallable(() -> {
throw new RuntimeException("Test error");
}).doOnError(error -> {
logger.error("Processing failed", error);
});
}
// 4. 使用 contextWrite 添加上下文信息
public Mono<String> processWithContext() {
return Mono.fromCallable(() -> "Result")
.contextWrite(context -> context.put("requestId", "12345"))
.doOnSubscribe(subscription -> {
String requestId = subscription.currentContext().get("requestId");
logger.info("Processing request: {}", requestId);
});
}
// 5. 使用 metrics 收集性能指标
public Flux<String> processWithMetrics() {
long startTime = System.currentTimeMillis();
return Flux.range(1, 100)
.flatMap(this::processItem)
.doOnComplete(() -> {
long duration = System.currentTimeMillis() - startTime;
logger.info("Processed 100 items in {} ms", duration);
});
}
}
7. 安全最佳实践
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class SecurityBestPractices {
// 1. 使用 @PreAuthorize 进行方法级安全控制
@PreAuthorize("hasRole('ADMIN')")
public Flux<User> getAllUsers() {
return userRepository.findAll();
}
// 2. 使用参数化查询防止 SQL 注入
public Mono<User> findByUsername(String username) {
// 使用参数化查询而不是字符串拼接
return userRepository.findByUsername(username);
}
// 3. 验证输入参数
public Mono<User> createUser(User user) {
// 验证用户输入
if (user.getUsername() == null || user.getUsername().isEmpty()) {
return Mono.error(new ValidationException("Username is required"));
}
if (user.getEmail() == null || !isValidEmail(user.getEmail())) {
return Mono.error(new ValidationException("Invalid email format"));
}
return userRepository.save(user);
}
// 4. 使用 switchIfEmpty 防止信息泄露
public Mono<User> getUserById(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found")));
}
// 5. 限制返回数据量
public Flux<User> getUsers(int page, int size) {
// 限制返回的数据量,防止大量数据传输
if (size > 100) {
size = 100;
}
return userRepository.findAll()
.skip(page * size)
.take(size);
}
private boolean isValidEmail(String email) {
// 邮箱验证逻辑
return email != null && email.matches("^[A-Za-z0-9+_.-]+@.*\\.[A-Za-z]{2,}$");
}
}
8. 响应式编程常见陷阱
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Schedulers;
import java.time.Duration;
public class CommonPitfalls {
// ❌ 错误:在 subscribe 中阻塞
public void wrongBlockingInSubscribe() {
Flux.range(1, 10)
.subscribe(i -> {
try {
Thread.sleep(1000); // 阻塞操作
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// ✅ 正确:使用 subscribeOn 在单独线程执行
public void correctNonBlockingSubscribe() {
Flux.range(1, 10)
.flatMap(i -> Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}).subscribeOn(Schedulers.boundedElastic()))
.subscribe();
}
// ❌ 错误:忘记订阅
public void wrongForgettingToSubscribe() {
Mono.just("Hello")
.map(String::toUpperCase); // 没有订阅,不会执行
}
// ✅ 正确:记得订阅
public void correctWithSubscribe() {
Mono.just("Hello")
.map(String::toUpperCase)
.subscribe(System.out::println);
}
// ❌ 错误:在响应式流中使用阻塞操作
public Flux<String> wrongBlockingInStream() {
return Flux.range(1, 10)
.map(i -> {
try {
Thread.sleep(1000); // 阻塞操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Item " + i;
});
}
// ✅ 正确:使用异步操作
public Flux<String> correctAsyncInStream() {
return Flux.range(1, 10)
.flatMap(i -> Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Item " + i;
}).subscribeOn(Schedulers.boundedElastic()));
}
// ❌ 错误:过度使用 flatMap 导致并发过高
public Flux<String> wrongExcessiveFlatMap() {
return Flux.range(1, 10000)
.flatMap(i -> Mono.fromCallable(() -> processItem(i))); // 并发度可能过高
}
// ✅ 正确:控制并发度
public Flux<String> correctControlledConcurrency() {
return Flux.range(1, 10000)
.flatMap(i ->
Mono.fromCallable(() -> processItem(i))
.subscribeOn(Schedulers.boundedElastic()),
10 // 限制并发度为 10
);
}
// ❌ 错误:忽略错误处理
public Flux<String> wrongIgnoringErrors() {
return Flux.range(1, 10)
.map(i -> {
if (i == 5) {
throw new RuntimeException("Error");
}
return "Item " + i;
}); // 没有错误处理,流会终止
}
// ✅ 正确:添加错误处理
public Flux<String> correctWithErrorHandling() {
return Flux.range(1, 10)
.map(i -> {
if (i == 5) {
throw new RuntimeException("Error");
}
return "Item " + i;
})
.onErrorResume(error -> Flux.just("Error: " + error.getMessage()));
}
}
9. 响应式编程设计模式
观察者模式(Observer Pattern)
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
public class ObserverPattern {
private final Sinks.Many<String> eventSink = Sinks.many().multicast().onBackpressureBuffer();
// 事件发布者
public void publishEvent(String event) {
eventSink.tryEmitNext(event);
}
// 事件订阅者
public Flux<String> subscribeToEvents() {
return eventSink.asFlux();
}
}
策略模式(Strategy Pattern)
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
public class StrategyPattern {
public interface ProcessingStrategy {
Mono<String> process(String input);
}
public static class UppercaseStrategy implements ProcessingStrategy {
@Override
public Mono<String> process(String input) {
return Mono.just(input.toUpperCase());
}
}
public static class LowercaseStrategy implements ProcessingStrategy {
@Override
public Mono<String> process(String input) {
return Mono.just(input.toLowerCase());
}
}
public static class ReverseStrategy implements ProcessingStrategy {
@Override
public Mono<String> process(String input) {
return Mono.just(new StringBuilder(input).reverse().toString());
}
}
// 策略工厂
public static ProcessingStrategy getStrategy(String type) {
switch (type) {
case "uppercase":
return new UppercaseStrategy();
case "lowercase":
return new LowercaseStrategy();
case "reverse":
return new ReverseStrategy();
default:
throw new IllegalArgumentException("Unknown strategy: " + type);
}
}
}
责任链模式(Chain of Responsibility)
import reactor.core.publisher.Mono;
public class ChainOfResponsibility {
public interface Handler {
Mono<String> handle(String request);
}
public static class AuthenticationHandler implements Handler {
private final Handler next;
public AuthenticationHandler(Handler next) {
this.next = next;
}
@Override
public Mono<String> handle(String request) {
if (request.startsWith("auth:")) {
return Mono.just("Authenticated: " + request.substring(5));
}
return next.handle(request);
}
}
public static class AuthorizationHandler implements Handler {
private final Handler next;
public AuthorizationHandler(Handler next) {
this.next = next;
}
@Override
public Mono<String> handle(String request) {
if (request.startsWith("auth:admin")) {
return Mono.just("Authorized: " + request);
}
return next.handle(request);
}
}
public static class LoggingHandler implements Handler {
@Override
public Mono<String> handle(String request) {
System.out.println("Logging request: " + request);
return Mono.just("Logged: " + request);
}
}
// 构建责任链
public static Handler buildChain() {
Handler loggingHandler = new LoggingHandler();
Handler authorizationHandler = new AuthorizationHandler(loggingHandler);
Handler authenticationHandler = new AuthenticationHandler(authorizationHandler);
return authenticationHandler;
}
}
通过遵循这些最佳实践,可以构建高性能、可维护、可靠的响应式应用程序。响应式编程虽然有一定的学习曲线,但掌握这些技巧后,能够充分发挥其优势,构建出卓越的应用系统。
12. 常见问题和解决方案
12.1 基础问题
Q1: 为什么我的响应式代码没有执行?
问题描述:创建了 Mono 或 Flux,但代码没有执行任何操作。
原因:响应式流是懒加载的,只有在订阅时才会开始执行。
解决方案:
// ❌ 错误:没有订阅
Mono.just("Hello")
.map(String::toUpperCase);
// ✅ 正确:添加订阅
Mono.just("Hello")
.map(String::toUpperCase)
.subscribe(System.out::println);
// ✅ 正确:在 WebFlux 中,框架会自动订阅
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.just("Hello")
.map(String::toUpperCase);
}
Q2: 如何在响应式流中处理阻塞操作?
问题描述:在响应式流中调用阻塞方法导致整个应用被阻塞。
原因:响应式流运行在事件循环线程上,阻塞操作会阻塞整个线程。
解决方案:
// ❌ 错误:在响应式流中直接阻塞
Flux.range(1, 10)
.map(i -> {
try {
Thread.sleep(1000); // 阻塞操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
});
// ✅ 正确:使用 subscribeOn 在单独线程执行
Flux.range(1, 10)
.flatMap(i -> Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}).subscribeOn(Schedulers.boundedElastic()));
// ✅ 正确:对于数据库操作,使用响应式驱动
@Repository
public interface UserRepository extends R2dbcRepository<User, Long> {
// 响应式数据库操作,不需要手动处理阻塞
}
// ✅ 正确:对于必须使用阻塞 API 的情况
public Mono<String> callBlockingApi() {
return Mono.fromCallable(() -> {
// 调用阻塞 API
return blockingService.call();
}).subscribeOn(Schedulers.boundedElastic());
}
Q3: 为什么我的 Mono/Flux 只执行一次?
问题描述:多次订阅同一个 Mono/Flux,但只执行了一次。
原因:默认情况下,Mono/Flux 是冷序列,每次订阅都会重新执行。但如果使用了 cache 或其他操作,可能会改变行为。
解决方案:
// 冷序列:每次订阅都重新执行
Mono<String> coldMono = Mono.fromCallable(() -> {
System.out.println("Executing...");
return "Result";
});
coldMono.subscribe(); // 输出: Executing...
coldMono.subscribe(); // 输出: Executing...
// 热序列:缓存结果,多次订阅只执行一次
Mono<String> hotMono = Mono.fromCallable(() -> {
System.out.println("Executing...");
return "Result";
}).cache();
hotMono.subscribe(); // 输出: Executing...
hotMono.subscribe(); // 没有输出
// 如果需要每次订阅都执行,不要使用 cache
// 如果需要缓存结果,使用 cache()
12.2 错误处理问题
Q4: 如何处理响应式流中的异常?
问题描述:响应式流中的异常导致整个流终止,无法继续处理。
解决方案:
// 方案1: 使用 onErrorResume 提供降级方案
public Mono<String> getUserWithFallback(Long id) {
return userRepository.findById(id)
.onErrorResume(UserNotFoundException.class, ex -> {
// 特定异常的处理
return Mono.just("Default User");
})
.onErrorResume(Exception.class, ex -> {
// 通用异常的处理
return Mono.just("Error: " + ex.getMessage());
});
}
// 方案2: 使用 onErrorReturn 返回默认值
public Flux<String> getAllUsersWithDefault() {
return userRepository.findAll()
.map(User::getName)
.onErrorReturn("Error", "Unknown User");
}
// 方案3: 使用 retry 进行重试
public Mono<User> getUserWithRetry(Long id) {
return userRepository.findById(id)
.retryWhen(Retry.backoff(3, Duration.ofMillis(100))
.filter(throwable ->
throwable instanceof TimeoutException ||
throwable instanceof NetworkException
)
);
}
// 方案4: 使用 onErrorContinue 继续处理后续元素
public Flux<String> processWithErrorContinue() {
return Flux.range(1, 10)
.flatMap(i -> processItem(i))
.onErrorContinue((error, item) -> {
logger.error("Error processing item {}: {}", item, error.getMessage());
});
}
Q5: 如何捕获并记录所有错误?
问题描述:需要记录响应式流中发生的所有错误,包括中间步骤的错误。
解决方案:
public Mono<Void> processWithFullErrorLogging() {
return Flux.range(1, 10)
.doOnNext(i -> logger.info("Processing item: {}", i))
.flatMap(i -> processItem(i)
.doOnError(error ->
logger.error("Error processing item {}: {}", i, error.getMessage())
)
)
.doOnError(error ->
logger.error("Stream processing failed: {}", error.getMessage())
)
.doOnComplete(() ->
logger.info("Stream processing completed successfully")
)
.then();
}
// 使用 context 传递请求 ID 以便跟踪
public Mono<String> processWithContext() {
return Mono.fromCallable(() -> "Result")
.contextWrite(context -> context.put("requestId", UUID.randomUUID().toString()))
.doOnError(error -> {
String requestId = error.getSuppressed()[0].getMessage();
logger.error("Request {} failed: {}", requestId, error.getMessage());
});
}
12.3 背压问题
Q6: 如何处理快速生产者和慢速消费者?
问题描述:生产者产生数据的速度快于消费者处理速度,导致内存溢出或系统崩溃。
解决方案:
// 方案1: 使用 onBackpressureBuffer 缓冲数据
public Flux<String> processWithBuffer() {
return Flux.interval(Duration.ofMillis(10))
.onBackpressureBuffer(100) // 缓冲最多 100 个元素
.flatMap(this::processItem)
.subscribeOn(Schedulers.boundedElastic());
}
// 方案2: 使用 onBackpressureDrop 丢弃多余数据
public Flux<String> processWithDrop() {
return Flux.interval(Duration.ofMillis(10))
.onBackpressureDrop(dropped ->
logger.warn("Dropped item: {}", dropped)
)
.flatMap(this::processItem);
}
// 方案3: 使用 onBackpressureLatest 保留最新值
public Flux<String> processWithLatest() {
return Flux.interval(Duration.ofMillis(10))
.onBackpressureLatest()
.flatMap(this::processItem);
}
// 方案4: 使用 limitRate 控制处理速率
public Flux<String> processWithRateLimit() {
return Flux.interval(Duration.ofMillis(10))
.limitRate(100) // 每秒最多处理 100 个元素
.flatMap(this::processItem);
}
// 方案5: 使用 sample 降频处理
public Flux<String> processWithSampling() {
return Flux.interval(Duration.ofMillis(10))
.sample(Duration.ofMillis(100)) // 每 100ms 取一个样本
.flatMap(this::processItem);
}
Q7: 如何调试背压问题?
问题描述:不确定是否发生了背压问题,需要调试。
解决方案:
public Flux<String> debugBackpressure() {
return Flux.interval(Duration.ofMillis(10))
.doOnRequest(request -> {
logger.info("Requested {} items", request);
})
.doOnNext(item -> {
logger.info("Emitted item: {}", item);
})
.doOnCancel(() -> {
logger.warn("Stream was cancelled");
})
.doOnComplete(() -> {
logger.info("Stream completed");
})
.doOnError(error -> {
logger.error("Stream error: {}", error.getMessage());
})
.flatMap(this::processItem);
}
// 使用 Reactor Debug 模式
// 启动时添加 JVM 参数: -Dreactor.traceback=true
12.4 性能问题
Q8: 如何提高响应式应用的性能?
问题描述:响应式应用性能不如预期,需要优化。
解决方案:
// 1. 使用并行处理
public Flux<Integer> parallelProcessing() {
return Flux.range(1, 1000)
.parallel()
.runOn(Schedulers.parallel())
.map(this::heavyComputation)
.sequential();
}
// 2. 控制并发度
public Flux<String> controlledConcurrency() {
return Flux.range(1, 100)
.flatMap(i ->
Mono.fromCallable(() -> processItem(i))
.subscribeOn(Schedulers.boundedElastic()),
10 // 限制并发度为 10
);
}
// 3. 使用缓存
public Mono<String> getCachedData(String key) {
return fetchDataFromDatabase(key)
.cache(); // 缓存结果
}
// 4. 使用连接池
@Bean
public WebClient optimizedWebClient() {
ConnectionProvider provider = ConnectionProvider.builder("custom")
.maxConnections(500)
.maxIdleTime(Duration.ofSeconds(20))
.build();
HttpClient httpClient = HttpClient.create(provider);
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl("https://api.example.com")
.build();
}
// 5. 避免不必要的操作
public Flux<String> optimizedStream() {
return Flux.range(1, 1000)
.filter(i -> i % 2 == 0) // 先过滤再处理
.map(this::processItem);
}
Q9: 如何避免内存泄漏?
问题描述:响应式应用出现内存泄漏,内存占用持续增长。
解决方案:
// 1. 使用 take 避免无限流
public Flux<Long> limitedStream() {
return Flux.interval(Duration.ofMillis(100))
.take(100); // 只取前 100 个元素
}
// 2. 使用 timeout 避免长时间阻塞
public Mono<String> withTimeout() {
return Mono.fromCallable(() -> {
Thread.sleep(5000);
return "Result";
}).timeout(Duration.ofSeconds(3), Mono.just("Timeout"));
}
// 3. 正确处理资源释放
public Flux<String> processWithResourceCleanup() {
return Mono.using(
() -> acquireResource(),
resource -> processWithResource(resource),
this::releaseResource
);
}
// 4. 使用 doFinally 确保清理
public Mono<Void> processWithCleanup() {
return Flux.range(1, 10)
.flatMap(this::processItem)
.doFinally(signalType -> {
cleanupResources();
})
.then();
}
// 5. 避免在 doOnNext/doOnError 中持有引用
public Flux<String> avoidMemoryLeak() {
return Flux.range(1, 1000)
.flatMap(i -> processItem(i))
.doOnNext(item -> {
// 不要在这里持有对 item 的引用
System.out.println(item);
});
}
12.5 线程和调度问题
Q10: 如何正确使用 Scheduler?
问题描述:不清楚何时使用哪个 Scheduler,导致性能问题或线程安全问题。
解决方案:
// Schedulers.parallel(): CPU 密集型任务
public Flux<Integer> cpuIntensiveTask() {
return Flux.range(1, 1000)
.parallel()
.runOn(Schedulers.parallel()) // 使用并行调度器
.map(i -> heavyComputation(i))
.sequential();
}
// Schedulers.boundedElastic(): I/O 密集型任务
public Flux<String> ioIntensiveTask() {
return Flux.range(1, 100)
.flatMap(i ->
Mono.fromCallable(() -> ioOperation(i))
.subscribeOn(Schedulers.boundedElastic()) // 使用弹性调度器
);
}
// Schedulers.single(): 单线程顺序执行
public Flux<Integer> singleThreadTask() {
return Flux.range(1, 10)
.publishOn(Schedulers.single()) // 使用单线程调度器
.map(i -> i * 2);
}
// Schedulers.immediate(): 当前线程执行
public Mono<String> immediateExecution() {
return Mono.just("Hello")
.publishOn(Schedulers.immediate()); // 在当前线程执行
}
// 自定义线程池
public Flux<String> customThreadPool() {
ExecutorService executor = Executors.newFixedThreadPool(10);
return Flux.range(1, 100)
.flatMap(i ->
Mono.fromCallable(() -> processItem(i))
.subscribeOn(Schedulers.fromExecutor(executor))
);
}
Q11: 如何避免线程安全问题?
问题描述:在响应式流中访问共享状态导致线程安全问题。
解决方案:
// ❌ 错误:共享可变状态
public Flux<Integer> wrongSharedState() {
AtomicInteger counter = new AtomicInteger(0);
return Flux.range(1, 100)
.parallel()
.runOn(Schedulers.parallel())
.map(i -> counter.incrementAndGet()); // 线程不安全
}
// ✅ 正确:使用线程安全的数据结构
public Flux<Integer> correctThreadSafeState() {
AtomicLong counter = new AtomicLong(0);
return Flux.range(1, 100)
.parallel()
.runOn(Schedulers.parallel())
.map(i -> counter.incrementAndGet()); // AtomicLong 是线程安全的
}
// ✅ 正确:避免共享状态
public Flux<Integer> avoidSharedState() {
return Flux.range(1, 100)
.parallel()
.runOn(Schedulers.parallel())
.map(i -> i + 1); // 每个元素独立处理
}
// ✅ 正确:使用 context 传递上下文信息
public Mono<String> useContext() {
return Mono.deferContextual(contextView -> {
String requestId = contextView.get("requestId");
return Mono.just("Processing request: " + requestId);
}).contextWrite(context -> context.put("requestId", UUID.randomUUID().toString()));
}
12.6 测试问题
Q12: 如何测试响应式代码?
问题描述:不知道如何正确测试响应式代码。
解决方案:
// 1. 使用 StepVerifier 测试 Mono
@Test
public void testMono() {
Mono<String> mono = Mono.just("Hello");
StepVerifier.create(mono)
.expectNext("Hello")
.expectComplete()
.verify();
}
// 2. 使用 StepVerifier 测试 Flux
@Test
public void testFlux() {
Flux<Integer> flux = Flux.range(1, 5);
StepVerifier.create(flux)
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
.verify();
}
// 3. 测试错误场景
@Test
public void testError() {
Mono<String> mono = Mono.error(new RuntimeException("Test error"));
StepVerifier.create(mono)
.expectError(RuntimeException.class)
.verify();
}
// 4. 使用虚拟时间测试时间相关操作
@Test
public void testWithVirtualTime() {
VirtualTimeScheduler.getOrSet();
try {
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1))
.take(3);
StepVerifier.withVirtualTime(() -> flux)
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(0L)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(1L)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(2L)
.expectComplete()
.verify();
} finally {
VirtualTimeScheduler.reset();
}
}
// 5. 使用 WebTestClient 测试 WebFlux
@SpringBootTest
@AutoConfigureWebTestClient
public class WebFluxTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void testGetUser() {
webTestClient.get()
.uri("/api/users/1")
.exchange()
.expectStatus().isOk()
.expectBody(User.class);
}
}
12.7 数据库问题
Q13: 如何在响应式应用中使用事务?
问题描述:需要在响应式应用中管理数据库事务。
解决方案:
// 1. 声明式事务
@Service
public class UserService {
@Transactional
public Mono<User> createUserWithOrder(User user, Order order) {
return userRepository.save(user)
.flatMap(savedUser -> {
order.setUserId(savedUser.getId());
return orderRepository.save(order);
})
.thenReturn(savedUser);
}
}
// 2. 编程式事务
@Service
public class TransactionService {
private final TransactionalOperator rxtx;
public Mono<Void> transferMoney(Long fromId, Long toId, Double amount) {
return userRepository.findById(fromId)
.flatMap(fromUser -> userRepository.findById(toId)
.flatMap(toUser -> {
fromUser.setBalance(fromUser.getBalance() - amount);
toUser.setBalance(toUser.getBalance() + amount);
return userRepository.save(fromUser)
.then(userRepository.save(toUser));
})
)
.as(rxtx::transactional) // 应用事务
.then();
}
}
// 3. 嵌套事务
@Transactional
public Mono<Void> complexOperation(User user, Order order, Payment payment) {
return userRepository.save(user)
.flatMap(savedUser -> {
order.setUserId(savedUser.getId());
return orderRepository.save(order);
})
.flatMap(savedOrder -> {
payment.setOrderId(savedOrder.getId());
return processPayment(payment);
})
.then();
}
Q14: 如何处理数据库连接池耗尽?
问题描述:高并发情况下数据库连接池耗尽。
解决方案:
// 1. 配置合适的连接池大小
@Configuration
public class R2dbcPoolConfig extends AbstractR2dbcConfiguration {
@Override
protected String connectionUrl() {
return "r2dbc:postgresql://localhost:5432/reactor_db" +
"?poolSize=50" + // 根据并发量调整
"&maxIdleTime=PT30S" +
"&maxLifeTime=PT10M";
}
}
// 2. 使用背压控制数据库访问速率
public Flux<User> getUsersWithBackpressure() {
return userRepository.findAll()
.onBackpressureBuffer(100) // 缓冲请求
.limitRate(50); // 限制处理速率
}
// 3. 使用超时避免长时间占用连接
public Mono<User> getUserWithTimeout(Long id) {
return userRepository.findById(id)
.timeout(Duration.ofSeconds(5), Mono.empty());
}
// 4. 监控连接池使用情况
@Component
public class ConnectionPoolMonitor {
@Scheduled(fixedRate = 60000)
public void monitorPool() {
// 监控连接池指标
logger.info("Active connections: {}", getActiveConnections());
logger.info("Idle connections: {}", getIdleConnections());
}
}
12.8 安全问题
Q15: 如何在响应式应用中实现安全控制?
问题描述:需要在响应式应用中实现认证和授权。
解决方案:
// 1. 方法级安全
@Service
public class SecureUserService {
@PreAuthorize("hasRole('ADMIN')")
public Flux<User> getAllUsers() {
return userRepository.findAll();
}
@PreAuthorize("#id == authentication.principal.id or hasRole('ADMIN')")
public Mono<User> getOwnUser(Long id) {
return userRepository.findById(id);
}
}
// 2. JWT 认证
@Component
public class JwtAuthenticationFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String authHeader = exchange.getRequest().getHeaders().getFirst("Authorization");
if (authHeader != null && authHeader.startsWith("Bearer ")) {
String token = authHeader.substring(7);
return authenticationManager.authenticate(
new UsernamePasswordAuthenticationToken(null, token)
).flatMap(authentication -> {
return chain.filter(exchange)
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
}).switchIfEmpty(chain.filter(exchange));
}
return chain.filter(exchange);
}
}
// 3. 输入验证
public Mono<User> createUser(User user) {
if (user.getUsername() == null || user.getUsername().isEmpty()) {
return Mono.error(new ValidationException("Username is required"));
}
if (user.getEmail() == null || !isValidEmail(user.getEmail())) {
return Mono.error(new ValidationException("Invalid email format"));
}
return userRepository.save(user);
}
// 4. 参数化查询防止 SQL 注入
public interface UserRepository extends R2dbcRepository<User, Long> {
// 使用参数化查询
Mono<User> findByUsername(String username);
// 不要使用字符串拼接
// @Query("SELECT * FROM users WHERE username = '" + username + "'") // ❌ 错误
}
12.9 调试和监控
Q16: 如何调试响应式代码?
问题描述:响应式代码难以调试,需要找到问题所在。
解决方案:
// 1. 使用 doOnNext/doOnError/doOnComplete 添加日志
public Flux<String> debugStream() {
return Flux.range(1, 10)
.doOnNext(i -> logger.info("Processing item: {}", i))
.flatMap(i -> processItem(i)
.doOnError(error ->
logger.error("Error processing item {}: {}", i, error.getMessage())
)
)
.doOnComplete(() -> logger.info("Stream completed"))
.doOnError(error -> logger.error("Stream error: {}", error.getMessage()));
}
// 2. 使用 checkpoint 添加检查点
public Flux<String> withCheckpoint() {
return Flux.range(1, 10)
.flatMap(this::processItem)
.checkpoint("After processing"); // 添加检查点
}
// 3. 使用 Reactor Debug 模式
// 启动时添加 JVM 参数: -Dreactor.traceback=true
// 4. 使用 log() 操作符
public Flux<String> withLog() {
return Flux.range(1, 10)
.log("debug-stream") // 记录流的所有事件
.flatMap(this::processItem);
}
// 5. 使用 context 传递调试信息
public Mono<String> debugWithContext() {
return Mono.deferContextual(contextView -> {
String debugId = contextView.getOrDefault("debugId", "unknown");
logger.info("Processing with debug ID: {}", debugId);
return Mono.just("Result");
}).contextWrite(context -> context.put("debugId", UUID.randomUUID().toString()));
}
Q17: 如何监控响应式应用?
问题描述:需要监控响应式应用的性能和健康状况。
解决方案:
// 1. 使用 Micrometer 收集指标
@Component
public class MetricsCollector {
private final MeterRegistry meterRegistry;
private final Counter requestCounter;
private final Timer responseTimer;
public MetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.requestCounter = Counter.builder("http.requests")
.description("Number of HTTP requests")
.register(meterRegistry);
this.responseTimer = Timer.builder("http.response.time")
.description("HTTP response time")
.register(meterRegistry);
}
public Mono<ServerResponse> withMetrics(ServerRequest request, Mono<ServerResponse> response) {
requestCounter.increment();
long startTime = System.currentTimeMillis();
return response.doOnSuccess(r -> {
long duration = System.currentTimeMillis() - startTime;
responseTimer.record(duration, TimeUnit.MILLISECONDS);
});
}
}
// 2. 使用 Reactor 的 hooks 监控
@Component
public class ReactorHooks implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 监控所有错误
Hooks.onOperatorDebug();
// 监控所有操作符
Hooks.onEachOperator(Operators.lift((sc, sub) -> {
// 自定义监控逻辑
return new CoreSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Object o) {
// 记录数据
sub.onNext(o);
}
@Override
public void onError(Throwable t) {
// 记录错误
sub.onError(t);
}
@Override
public void onComplete() {
// 记录完成
sub.onComplete();
}
};
}));
}
}
// 3. 使用 Actuator 暴露健康检查端点
@RestController
public class HealthController {
@GetMapping("/health")
public Mono<Health> health() {
return Mono.fromCallable(() -> {
// 检查数据库连接
boolean dbHealthy = checkDatabase();
// 检查外部服务
boolean externalHealthy = checkExternalServices();
if (dbHealthy && externalHealthy) {
return Health.up()
.withDetail("database", "OK")
.withDetail("external", "OK")
.build();
} else {
return Health.down()
.withDetail("database", dbHealthy ? "OK" : "DOWN")
.withDetail("external", externalHealthy ? "OK" : "DOWN")
.build();
}
}).subscribeOn(Schedulers.boundedElastic());
}
}
12.10 迁移问题
Q18: 如何从传统 Spring MVC 迁移到 WebFlux?
问题描述:需要将现有的 Spring MVC 应用迁移到 WebFlux。
解决方案:
// 1. Controller 返回类型变更
// 传统 Spring MVC
@GetMapping("/users/{id}")
public User getUser(@PathVariable Long id) {
return userService.findById(id);
}
// WebFlux
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findById(id);
}
// 2. Service 层变更
// 传统 Spring MVC
@Service
public class UserService {
public User findById(Long id) {
return userRepository.findById(id);
}
}
// WebFlux
@Service
public class UserService {
public Mono<User> findById(Long id) {
return userRepository.findById(id);
}
}
// 3. Repository 层变更
// 传统 Spring Data JPA
public interface UserRepository extends JpaRepository<User, Long> {
User findById(Long id);
}
// WebFlux Spring Data R2DBC
public interface UserRepository extends R2dbcRepository<User, Long> {
Mono<User> findById(Long id);
}
// 4. 配置变更
// 传统 Spring MVC
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
// WebFlux
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
// 5. 依赖变更
// 传统 Spring MVC
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
}
// WebFlux
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
}
Q19: 如何逐步迁移到响应式编程?
问题描述:需要逐步迁移,而不是一次性重写整个应用。
解决方案:
// 1. 使用 Spring MVC 和 WebFlux 共存
@SpringBootApplication
public class HybridApplication {
public static void main(String[] args) {
SpringApplication.run(HybridApplication.class, args);
}
}
// 2. 在 Spring MVC 中使用响应式组件
@RestController
@RequestMapping("/api/v1")
public class LegacyController {
private final ReactiveUserService reactiveUserService;
@GetMapping("/users/{id}")
public User getUser(@PathVariable Long id) {
// 阻塞式调用响应式服务
return reactiveUserService.findById(id).block();
}
}
@RestController
@RequestMapping("/api/v2")
public class ReactiveController {
private final ReactiveUserService reactiveUserService;
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
// 完全响应式
return reactiveUserService.findById(id);
}
}
// 3. 使用适配器模式
public class BlockingUserServiceAdapter {
private final ReactiveUserService reactiveUserService;
public User findById(Long id) {
return reactiveUserService.findById(id)
.block(Duration.ofSeconds(5));
}
public List<User> findAll() {
return reactiveUserService.findAll()
.collectList()
.block(Duration.ofSeconds(5));
}
}
通过了解这些常见问题和解决方案,开发者可以更好地应对响应式编程中的各种挑战,构建更加稳定、高效的响应式应用程序。