Spring 响应式编程特性详解

Spring 框架提供了全面的响应式编程支持,主要通过 Spring WebFlux、Project Reactor 和 Spring Data R2DBC 等模块实现。以下是 Spring 响应式编程的主要特性:

1. 响应式核心组件

Project Reactor

  • Mono: 表示 0-1 个元素的异步序列,用于返回单个结果或空结果
  • Flux: 表示 0-N 个元素的异步序列,用于处理多个元素的数据流
  • Scheduler: 提供灵活的线程调度机制,支持并行、顺序、定时等多种执行模式

Mono 使用示例

import reactor.core.publisher.Mono;
import java.time.Duration;

public class MonoExample {
    public static void main(String[] args) {
        // 创建一个包含单个值的 Mono
        Mono<String> mono1 = Mono.just("Hello, Reactor!");
        
        // 创建一个空的 Mono
        Mono<String> mono2 = Mono.empty();
        
        // 创建一个包含错误的 Mono
        Mono<String> mono3 = Mono.error(new RuntimeException("Something went wrong"));
        
        // 异步创建 Mono
        Mono<String> mono4 = Mono.fromSupplier(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Async result";
        });
        
        // 订阅并处理结果
        mono1.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error.getMessage()),
            () -> System.out.println("Completed!")
        );
        
        // 使用 map 转换数据
        Mono<Integer> mono5 = Mono.just(10)
            .map(x -> x * 2)
            .map(x -> x + 5);
        
        // 使用 flatMap 进行异步转换
        Mono<String> mono6 = Mono.just("user123")
            .flatMap(userId -> fetchUserAsync(userId));
        
        // 使用 filter 过滤数据
        Mono<Integer> mono7 = Mono.just(15)
            .filter(x -> x > 10)
            .switchIfEmpty(Mono.just(0));
        
        // 使用 defaultIfEmpty 提供默认值
        Mono<String> mono8 = Mono.<String>empty()
            .defaultIfEmpty("Default value");
        
        // 使用 timeout 处理超时
        Mono<String> mono9 = Mono.fromSupplier(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Delayed result";
        }).timeout(Duration.ofMillis(1000), Mono.just("Timeout fallback"));
        
        // 使用 retry 重试
        Mono<String> mono10 = Mono.fromSupplier(() -> {
            if (Math.random() > 0.7) {
                throw new RuntimeException("Random error");
            }
            return "Success";
        }).retry(3);
    }
    
    private static Mono<String> fetchUserAsync(String userId) {
        return Mono.just("User: " + userId);
    }
}

Flux 使用示例

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;

public class FluxExample {
    public static void main(String[] args) {
        // 创建一个包含多个值的 Flux
        Flux<String> flux1 = Flux.just("A", "B", "C", "D", "E");
        
        // 从集合创建 Flux
        List<String> list = Arrays.asList("X", "Y", "Z");
        Flux<String> flux2 = Flux.fromIterable(list);
        
        // 创建一个范围的 Flux
        Flux<Integer> flux3 = Flux.range(1, 10);
        
        // 创建一个间隔的 Flux
        Flux<Long> flux4 = Flux.interval(Duration.ofSeconds(1));
        
        // 订阅并处理结果
        flux1.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error.getMessage()),
            () -> System.out.println("Completed!")
        );
        
        // 使用 map 转换数据
        Flux<Integer> flux5 = Flux.range(1, 5)
            .map(x -> x * 2);
        
        // 使用 filter 过滤数据
        Flux<Integer> flux6 = Flux.range(1, 10)
            .filter(x -> x % 2 == 0);
        
        // 使用 flatMap 进行异步转换
        Flux<String> flux7 = Flux.just("user1", "user2", "user3")
            .flatMap(userId -> fetchUserDetailsAsync(userId));
        
        // 使用 collectList 将 Flux 转换为 Mono<List>
        Mono<List<Integer>> mono1 = Flux.range(1, 5)
            .collectList();
        
        // 使用 reduce 聚合数据
        Mono<Integer> mono2 = Flux.range(1, 5)
            .reduce((a, b) -> a + b);
        
        // 使用 buffer 批量处理
        Flux<List<Integer>> flux8 = Flux.range(1, 10)
            .buffer(3);
        
        // 使用 window 分组处理
        Flux<Flux<Integer>> flux9 = Flux.range(1, 10)
            .window(3);
        
        // 使用 merge 合并多个 Flux
        Flux<Integer> flux10 = Flux.merge(
            Flux.range(1, 3),
            Flux.range(4, 3),
            Flux.range(7, 3)
        );
        
        // 使用 zip 组合多个 Flux
        Flux<String> flux11 = Flux.zip(
            Flux.just("A", "B", "C"),
            Flux.just(1, 2, 3),
            Flux.just(true, false, true)
        ).map(tuple -> tuple.getT1() + "-" + tuple.getT2() + "-" + tuple.getT3());
        
        // 使用 take 和 skip
        Flux<Integer> flux12 = Flux.range(1, 10)
            .skip(2)
            .take(5);
        
        // 使用 distinct 去重
        Flux<Integer> flux13 = Flux.just(1, 2, 2, 3, 3, 3, 4)
            .distinct();
        
        // 使用 distinctUntilChanged 去除连续重复
        Flux<Integer> flux14 = Flux.just(1, 1, 2, 2, 2, 3, 3)
            .distinctUntilChanged();
        
        // 使用 throttleFirst 限流
        Flux<Integer> flux15 = Flux.interval(Duration.ofMillis(100))
            .map(Long::intValue)
            .throttleFirst(Duration.ofMillis(500));
        
        // 使用 delayElements 延迟发射
        Flux<Integer> flux16 = Flux.range(1, 5)
            .delayElements(Duration.ofMillis(500));
        
        // 使用 doOnNext 和 doOnError 添加副作用
        Flux<Integer> flux17 = Flux.range(1, 5)
            .doOnNext(value -> System.out.println("Processing: " + value))
            .doOnError(error -> System.err.println("Error: " + error))
            .doOnComplete(() -> System.out.println("All done!"));
        
        // 使用 onErrorResume 错误恢复
        Flux<String> flux18 = Flux.just("A", "B", "C")
            .map(s -> {
                if (s.equals("B")) {
                    throw new RuntimeException("Error on B");
                }
                return s;
            })
            .onErrorResume(error -> Flux.just("Fallback1", "Fallback2"));
        
        // 使用 onErrorReturn 返回默认值
        Flux<String> flux19 = Flux.just("A", "B", "C")
            .map(s -> {
                if (s.equals("B")) {
                    throw new RuntimeException("Error on B");
                }
                return s;
            })
            .onErrorReturn("Default");
    }
    
    private static Flux<String> fetchUserDetailsAsync(String userId) {
        return Flux.just(userId + "-detail1", userId + "-detail2");
    }
}

Scheduler 使用示例

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;

public class SchedulerExample {
    public static void main(String[] args) throws InterruptedException {
        // 使用 Schedulers.parallel() 并行处理
        Flux.range(1, 10)
            .parallel()
            .runOn(Schedulers.parallel())
            .map(i -> {
                System.out.println("Processing " + i + " on thread: " + Thread.currentThread().getName());
                return i * 2;
            })
            .sequential()
            .subscribe();
        
        Thread.sleep(1000);
        
        // 使用 Schedulers.elastic() 处理 I/O 操作
        Flux.just("file1.txt", "file2.txt", "file3.txt")
            .flatMap(filename ->
                Mono.fromCallable(() -> readFile(filename))
                    .subscribeOn(Schedulers.elastic())
            )
            .subscribe(content -> System.out.println("Content: " + content));
        
        Thread.sleep(1000);
        
        // 使用 Schedulers.single() 单线程执行
        Flux.range(1, 5)
            .publishOn(Schedulers.single())
            .map(i -> {
                System.out.println("Single thread: " + Thread.currentThread().getName());
                return i;
            })
            .subscribe();
        
        Thread.sleep(1000);
        
        // 使用 Schedulers.boundedElastic() 有界弹性调度器
        Flux.range(1, 20)
            .flatMap(i ->
                Mono.fromCallable(() -> heavyTask(i))
                    .subscribeOn(Schedulers.boundedElastic())
            )
            .subscribe();
        
        Thread.sleep(2000);
    }
    
    private static String readFile(String filename) {
        // 模拟文件读取
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Content of " + filename;
    }
    
    private static Integer heavyTask(int i) {
        // 模拟耗时任务
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i * i;
    }
}

背压处理

  • 实现了 Reactive Streams 规范,提供完善的背压机制
  • 支持多种背压策略:缓冲、丢弃、最新值等
  • 防止快速生产者压垮慢速消费者

背压处理示例

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;

public class BackpressureExample {
    public static void main(String[] args) throws InterruptedException {
        // 使用 onBackpressureBuffer 缓冲策略
        Flux.range(1, 1000)
            .onBackpressureBuffer(10) // 缓冲最多10个元素
            .subscribeOn(Schedulers.parallel())
            .publishOn(Schedulers.single())
            .subscribe(
                value -> {
                    System.out.println("Processing: " + value);
                    try {
                        Thread.sleep(100); // 模拟慢速消费者
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                },
                error -> System.err.println("Error: " + error)
            );
        
        Thread.sleep(2000);
        
        // 使用 onBackpressureDrop 丢弃策略
        Flux.range(1, 1000)
            .onBackpressureDrop(dropped -> System.out.println("Dropped: " + dropped))
            .subscribeOn(Schedulers.parallel())
            .publishOn(Schedulers.single())
            .subscribe(
                value -> {
                    System.out.println("Processing: " + value);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                },
                error -> System.err.println("Error: " + error)
            );
        
        Thread.sleep(2000);
        
        // 使用 onBackpressureLatest 保留最新值策略
        Flux.interval(Duration.ofMillis(10))
            .onBackpressureLatest()
            .subscribeOn(Schedulers.parallel())
            .publishOn(Schedulers.single())
            .subscribe(
                value -> {
                    System.out.println("Latest: " + value);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                },
                error -> System.err.println("Error: " + error)
            );
        
        Thread.sleep(2000);
        
        // 使用 onBackpressureError 错误策略
        Flux.range(1, 1000)
            .onBackpressureError()
            .subscribeOn(Schedulers.parallel())
            .publishOn(Schedulers.single())
            .subscribe(
                value -> {
                    System.out.println("Processing: " + value);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                },
                error -> System.err.println("Backpressure error: " + error)
            );
        
        Thread.sleep(2000);
    }
}

2. Spring WebFlux 特性

非阻塞 Web 框架

  • 完全非阻塞、事件驱动的架构
  • 支持 Reactive Streams 背压
  • 适用于高并发、低延迟场景

多种运行时支持

  • Netty 作为默认服务器
  • 支持 Servlet 3.1+ 容器(如 Tomcat、Jetty)
  • 支持 Undertow 作为服务器

函数式编程模型

  • 注解式控制器(类似 Spring MVC)
  • 函数式路由(RouterFunction 和 HandlerFunction)
  • 轻量级、函数式的开发方式

注解式控制器示例

import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/api/users")
public class UserController {
    
    private final UserService userService;
    
    public UserController(UserService userService) {
        this.userService = userService;
    }
    
    // 获取所有用户
    @GetMapping
    public Flux<User> getAllUsers() {
        return userService.findAll();
    }
    
    // 根据 ID 获取用户
    @GetMapping("/{id}")
    public Mono<User> getUserById(@PathVariable String id) {
        return userService.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException("User not found")));
    }
    
    // 创建用户
    @PostMapping
    public Mono<User> createUser(@RequestBody User user) {
        return userService.save(user);
    }
    
    // 更新用户
    @PutMapping("/{id}")
    public Mono<User> updateUser(@PathVariable String id, @RequestBody User user) {
        return userService.update(id, user);
    }
    
    // 删除用户
    @DeleteMapping("/{id}")
    public Mono<Void> deleteUser(@PathVariable String id) {
        return userService.deleteById(id);
    }
    
    // 批量创建用户
    @PostMapping("/batch")
    public Flux<User> createUsers(@RequestBody Flux<User> users) {
        return userService.saveAll(users);
    }
    
    // 搜索用户
    @GetMapping("/search")
    public Flux<User> searchUsers(
        @RequestParam(required = false) String name,
        @RequestParam(required = false) String email
    ) {
        return userService.search(name, email);
    }
    
    // 分页查询
    @GetMapping("/page")
    public Flux<User> getUsersByPage(
        @RequestParam(defaultValue = "0") int page,
        @RequestParam(defaultValue = "10") int size
    ) {
        return userService.findByPage(page, size);
    }
}

// 用户实体类
class User {
    private String id;
    private String name;
    private String email;
    private Integer age;
    
    // 构造器、getter 和 setter
    public User() {}
    
    public User(String id, String name, String email, Integer age) {
        this.id = id;
        this.name = name;
        this.email = email;
        this.age = age;
    }
    
    // getter 和 setter 方法
    public String getId() { return id; }
    public void setId(String id) { this.id = id; }
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
    public String getEmail() { return email; }
    public void setEmail(String email) { this.email = email; }
    public Integer getAge() { return age; }
    public void setAge(Integer age) { this.age = age; }
}

// 自定义异常
class UserNotFoundException extends RuntimeException {
    public UserNotFoundException(String message) {
        super(message);
    }
}

函数式路由示例

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.*;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;

@Configuration
public class UserRouter {
    
    private final UserHandler userHandler;
    
    public UserRouter(UserHandler userHandler) {
        this.userHandler = userHandler;
    }
    
    @Bean
    public RouterFunction<ServerResponse> userRoutes() {
        return RouterFunctions.route()
            // 获取所有用户
            .GET("/api/users", userHandler::getAllUsers)
            // 根据 ID 获取用户
            .GET("/api/users/{id}", userHandler::getUserById)
            // 创建用户
            .POST("/api/users", userHandler::createUser)
            // 更新用户
            .PUT("/api/users/{id}", userHandler::updateUser)
            // 删除用户
            .DELETE("/api/users/{id}", userHandler::deleteUser)
            // 搜索用户
            .GET("/api/users/search", userHandler::searchUsers)
            // 分页查询
            .GET("/api/users/page", userHandler::getUsersByPage)
            .build();
    }
}

// 处理器类
@Component
class UserHandler {
    
    private final UserService userService;
    
    public UserHandler(UserService userService) {
        this.userService = userService;
    }
    
    public Mono<ServerResponse> getAllUsers(ServerRequest request) {
        Flux<User> users = userService.findAll();
        return ServerResponse.ok().body(users, User.class);
    }
    
    public Mono<ServerResponse> getUserById(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok().bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    public Mono<ServerResponse> createUser(ServerRequest request) {
        Mono<User> userMono = request.bodyToMono(User.class);
        return userMono.flatMap(user ->
            userService.save(user)
                .flatMap(savedUser -> ServerResponse.ok().bodyValue(savedUser))
        );
    }
    
    public Mono<ServerResponse> updateUser(ServerRequest request) {
        String id = request.pathVariable("id");
        Mono<User> userMono = request.bodyToMono(User.class);
        return userMono.flatMap(user ->
            userService.update(id, user)
                .flatMap(updatedUser -> ServerResponse.ok().bodyValue(updatedUser))
                .switchIfEmpty(ServerResponse.notFound().build())
        );
    }
    
    public Mono<ServerResponse> deleteUser(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.deleteById(id)
            .then(ServerResponse.noContent().build())
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    public Mono<ServerResponse> searchUsers(ServerRequest request) {
        String name = request.queryParam("name").orElse(null);
        String email = request.queryParam("email").orElse(null);
        Flux<User> users = userService.search(name, email);
        return ServerResponse.ok().body(users, User.class);
    }
    
    public Mono<ServerResponse> getUsersByPage(ServerRequest request) {
        int page = Integer.parseInt(request.queryParam("page").orElse("0"));
        int size = Integer.parseInt(request.queryParam("size").orElse("10"));
        Flux<User> users = userService.findByPage(page, size);
        return ServerResponse.ok().body(users, User.class);
    }
}

WebFlux 过滤器示例

import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

@Component
public class LoggingFilter implements WebFilter {
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        String path = exchange.getRequest().getPath().value();
        String method = exchange.getRequest().getMethod().name();
        
        System.out.println("Request: " + method + " " + path);
        
        long startTime = System.currentTimeMillis();
        
        return chain.filter(exchange)
            .doOnSuccess(aVoid -> {
                long duration = System.currentTimeMillis() - startTime;
                System.out.println("Response: " + exchange.getResponse().getStatusCode() +
                    " - Duration: " + duration + "ms");
            })
            .doOnError(error -> {
                System.err.println("Error: " + error.getMessage());
            });
    }
}

// 认证过滤器
@Component
public class AuthenticationFilter implements WebFilter {
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        String token = exchange.getRequest().getHeaders().getFirst("Authorization");
        
        if (token == null || !token.startsWith("Bearer ")) {
            exchange.getResponse().setStatusCode(org.springframework.http.HttpStatus.UNAUTHORIZED);
            return exchange.getResponse().setComplete();
        }
        
        // 验证 token
        String jwtToken = token.substring(7);
        if (!validateToken(jwtToken)) {
            exchange.getResponse().setStatusCode(org.springframework.http.HttpStatus.FORBIDDEN);
            return exchange.getResponse().setComplete();
        }
        
        return chain.filter(exchange);
    }
    
    private boolean validateToken(String token) {
        // 实际项目中应该使用 JWT 库验证 token
        return !token.isEmpty();
    }
}

全局异常处理示例

import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@RestControllerAdvice
public class GlobalExceptionHandler {
    
    // 处理用户未找到异常
    @ExceptionHandler(UserNotFoundException.class)
    public Mono<ServerResponse> handleUserNotFound(
        UserNotFoundException ex,
        ServerWebExchange exchange
    ) {
        ErrorResponse error = new ErrorResponse(
            "USER_NOT_FOUND",
            ex.getMessage(),
            exchange.getRequest().getPath().value()
        );
        return ServerResponse
            .status(org.springframework.http.HttpStatus.NOT_FOUND)
            .bodyValue(error);
    }
    
    // 处理验证异常
    @ExceptionHandler(MethodArgumentNotValidException.class)
    public Mono<ServerResponse> handleValidationException(
        MethodArgumentNotValidException ex,
        ServerWebExchange exchange
    ) {
        ErrorResponse error = new ErrorResponse(
            "VALIDATION_ERROR",
            "Invalid request parameters",
            exchange.getRequest().getPath().value()
        );
        return ServerResponse
            .status(org.springframework.http.HttpStatus.BAD_REQUEST)
            .bodyValue(error);
    }
    
    // 处理通用异常
    @ExceptionHandler(Exception.class)
    public Mono<ServerResponse> handleGenericException(
        Exception ex,
        ServerWebExchange exchange
    ) {
        ErrorResponse error = new ErrorResponse(
            "INTERNAL_SERVER_ERROR",
            ex.getMessage(),
            exchange.getRequest().getPath().value()
        );
        return ServerResponse
            .status(org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR)
            .bodyValue(error);
    }
}

// 错误响应类
class ErrorResponse {
    private String code;
    private String message;
    private String path;
    private long timestamp;
    
    public ErrorResponse(String code, String message, String path) {
        this.code = code;
        this.message = message;
        this.path = path;
        this.timestamp = System.currentTimeMillis();
    }
    
    // getter 和 setter
    public String getCode() { return code; }
    public void setCode(String code) { this.code = code; }
    public String getMessage() { return message; }
    public void setMessage(String message) { this.message = message; }
    public String getPath() { return path; }
    public void setPath(String path) { this.path = path; }
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}

3. 响应式数据访问

Spring Data R2DBC

  • 响应式关系型数据库访问
  • 完全非阻塞的数据库操作
  • 支持主流关系型数据库(PostgreSQL、MySQL、H2 等)

R2DBC 实体类示例

import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.data.relational.core.mapping.Column;

@Table("users")
public class User {
    
    @Id
    private Long id;
    
    @Column("username")
    private String username;
    
    @Column("email")
    private String email;
    
    @Column("age")
    private Integer age;
    
    @Column("created_at")
    private LocalDateTime createdAt;
    
    // 构造器
    public User() {}
    
    public User(String username, String email, Integer age) {
        this.username = username;
        this.email = email;
        this.age = age;
        this.createdAt = LocalDateTime.now();
    }
    
    // getter 和 setter
    public Long getId() { return id; }
    public void setId(Long id) { this.id = id; }
    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }
    public String getEmail() { return email; }
    public void setEmail(String email) { this.email = email; }
    public Integer getAge() { return age; }
    public void setAge(Integer age) { this.age = age; }
    public LocalDateTime getCreatedAt() { return createdAt; }
    public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
}

R2DBC Repository 示例

import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Repository
public interface UserRepository extends R2dbcRepository<User, Long> {
    
    // 根据用户名查找用户
    Mono<User> findByUsername(String username);
    
    // 根据邮箱查找用户
    Mono<User> findByEmail(String email);
    
    // 根据年龄范围查找用户
    Flux<User> findByAgeBetween(Integer minAge, Integer maxAge);
    
    // 根据用户名模糊查询
    Flux<User> findByUsernameContaining(String keyword);
    
    // 检查用户名是否存在
    Mono<Boolean> existsByUsername(String username);
    
    // 统计指定年龄的用户数量
    Mono<Long> countByAge(Integer age);
}

R2DBC Service 示例

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class UserService {
    
    private final UserRepository userRepository;
    private final TransactionalOperator rxtx;
    
    public UserService(UserRepository userRepository, TransactionalOperator rxtx) {
        this.userRepository = userRepository;
        this.rxtx = rxtx;
    }
    
    // 获取所有用户
    public Flux<User> findAll() {
        return userRepository.findAll();
    }
    
    // 根据 ID 获取用户
    public Mono<User> findById(Long id) {
        return userRepository.findById(id);
    }
    
    // 保存用户
    @Transactional
    public Mono<User> save(User user) {
        return userRepository.save(user);
    }
    
    // 批量保存用户
    @Transactional
    public Flux<User> saveAll(Flux<User> users) {
        return userRepository.saveAll(users);
    }
    
    // 更新用户
    @Transactional
    public Mono<User> update(Long id, User user) {
        return userRepository.findById(id)
            .flatMap(existingUser -> {
                existingUser.setUsername(user.getUsername());
                existingUser.setEmail(user.getEmail());
                existingUser.setAge(user.getAge());
                return userRepository.save(existingUser);
            });
    }
    
    // 删除用户
    @Transactional
    public Mono<Void> deleteById(Long id) {
        return userRepository.deleteById(id);
    }
    
    // 根据用户名查找
    public Mono<User> findByUsername(String username) {
        return userRepository.findByUsername(username);
    }
    
    // 根据邮箱查找
    public Mono<User> findByEmail(String email) {
        return userRepository.findByEmail(email);
    }
    
    // 根据年龄范围查找
    public Flux<User> findByAgeBetween(Integer minAge, Integer maxAge) {
        return userRepository.findByAgeBetween(minAge, maxAge);
    }
    
    // 搜索用户
    public Flux<User> search(String name, String email) {
        if (name != null && email != null) {
            return userRepository.findByUsernameContaining(name)
                .filter(user -> user.getEmail().contains(email));
        } else if (name != null) {
            return userRepository.findByUsernameContaining(name);
        } else if (email != null) {
            return userRepository.findAll()
                .filter(user -> user.getEmail().contains(email));
        } else {
            return userRepository.findAll();
        }
    }
    
    // 分页查询
    public Flux<User> findByPage(int page, int size) {
        return userRepository.findAll()
            .skip(page * size)
            .take(size);
    }
    
    // 编程式事务示例
    public Mono<Void> transferUser(Long fromId, Long toId) {
        return userRepository.findById(fromId)
            .flatMap(fromUser -> userRepository.findById(toId)
                .flatMap(toUser -> {
                    // 执行业务逻辑
                    return userRepository.save(fromUser)
                        .then(userRepository.save(toUser));
                })
            )
            .as(rxtx::transactional); // 使用事务操作符
    }
}

R2DBC 配置示例

import io.r2dbc.spi.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.r2dbc.core.DatabaseClient;

@Configuration
public class R2dbcConfig extends AbstractR2dbcConfiguration {
    
    @Override
    protected String connectionUrl() {
        return "r2dbc:postgresql://localhost:5432/reactor_db";
    }
    
    @Override
    protected String username() {
        return "postgres";
    }
    
    @Override
    protected String password() {
        return "password";
    }
    
    @Bean
    public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
        return DatabaseClient.create(connectionFactory);
    }
    
    @Bean
    public TransactionalOperator transactionalOperator(ConnectionFactory connectionFactory) {
        return TransactionalOperator.create(R2dbcTransactionManager.create(connectionFactory));
    }
}

响应式 NoSQL 支持

  • Spring Data MongoDB 提供响应式模板和仓库
  • Spring Data Redis 支持响应式操作
  • Spring Data Cassandra 提供响应式驱动

MongoDB 响应式示例

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

// MongoDB 文档类
@Document(collection = "products")
public class Product {
    
    @Id
    private String id;
    private String name;
    private Double price;
    private String category;
    private Integer stock;
    
    // 构造器、getter 和 setter
    public Product() {}
    
    public Product(String name, Double price, String category, Integer stock) {
        this.name = name;
        this.price = price;
        this.category = category;
        this.stock = stock;
    }
    
    // getter 和 setter 方法
    public String getId() { return id; }
    public void setId(String id) { this.id = id; }
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
    public Double getPrice() { return price; }
    public void setPrice(Double price) { this.price = price; }
    public String getCategory() { return category; }
    public void setCategory(String category) { this.category = category; }
    public Integer getStock() { return stock; }
    public void setStock(Integer stock) { this.stock = stock; }
}

// MongoDB Repository
@Repository
public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
    
    // 根据分类查找产品
    Flux<Product> findByCategory(String category);
    
    // 根据价格范围查找产品
    Flux<Product> findByPriceBetween(Double minPrice, Double maxPrice);
    
    // 根据名称模糊查询
    Flux<Product> findByNameContaining(String keyword);
    
    // 查找库存不足的产品
    Flux<Product> findByStockLessThan(Integer threshold);
}

// MongoDB Service
@Service
public class ProductService {
    
    private final ProductRepository productRepository;
    
    public ProductService(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }
    
    // 获取所有产品
    public Flux<Product> findAll() {
        return productRepository.findAll();
    }
    
    // 根据 ID 获取产品
    public Mono<Product> findById(String id) {
        return productRepository.findById(id);
    }
    
    // 保存产品
    public Mono<Product> save(Product product) {
        return productRepository.save(product);
    }
    
    // 根据分类查找
    public Flux<Product> findByCategory(String category) {
        return productRepository.findByCategory(category);
    }
    
    // 根据价格范围查找
    public Flux<Product> findByPriceRange(Double minPrice, Double maxPrice) {
        return productRepository.findByPriceBetween(minPrice, maxPrice);
    }
}

Redis 响应式示例

import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class RedisService {
    
    private final ReactiveRedisTemplate<String, Object> redisTemplate;
    private final ReactiveStringRedisTemplate stringRedisTemplate;
    
    public RedisService(
        ReactiveRedisTemplate<String, Object> redisTemplate,
        ReactiveStringRedisTemplate stringRedisTemplate
    ) {
        this.redisTemplate = redisTemplate;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    
    // 设置键值对
    public Mono<Boolean> set(String key, Object value) {
        return redisTemplate.opsForValue().set(key, value);
    }
    
    // 设置键值对并指定过期时间
    public Mono<Boolean> set(String key, Object value, long timeout, java.util.concurrent.TimeUnit unit) {
        return redisTemplate.opsForValue().set(key, value, timeout, unit);
    }
    
    // 获取值
    public Mono<Object> get(String key) {
        return redisTemplate.opsForValue().get(key);
    }
    
    // 删除键
    public Mono<Boolean> delete(String key) {
        return redisTemplate.delete(key).map(count -> count > 0);
    }
    
    // 检查键是否存在
    public Mono<Boolean> exists(String key) {
        return redisTemplate.hasKey(key);
    }
    
    // 设置过期时间
    public Mono<Boolean> expire(String key, long timeout, java.util.concurrent.TimeUnit unit) {
        return redisTemplate.expire(key, timeout, unit);
    }
    
    // 获取过期时间
    public Mono<Long> getExpire(String key) {
        return redisTemplate.getExpire(key);
    }
    
    // 列表操作 - 左推
    public Mono<Long> lpush(String key, Object value) {
        return redisTemplate.opsForList().leftPush(key, value);
    }
    
    // 列表操作 - 右推
    public Mono<Long> rpush(String key, Object value) {
        return redisTemplate.opsForList().rightPush(key, value);
    }
    
    // 列表操作 - 左弹
    public Mono<Object> lpop(String key) {
        return redisTemplate.opsForList().leftPop(key);
    }
    
    // 列表操作 - 范围查询
    public Flux<Object> lrange(String key, long start, long end) {
        return redisTemplate.opsForList().range(key, start, end);
    }
    
    // 哈希操作 - 设置字段
    public Mono<Boolean> hset(String key, String field, Object value) {
        return redisTemplate.opsForHash().put(key, field, value);
    }
    
    // 哈希操作 - 获取字段
    public Mono<Object> hget(String key, String field) {
        return redisTemplate.opsForHash().get(key, field);
    }
    
    // 哈希操作 - 获取所有字段
    public Flux<Map.Entry<String, Object>> hgetall(String key) {
        return redisTemplate.opsForHash().entries(key);
    }
    
    // 集合操作 - 添加成员
    public Mono<Long> sadd(String key, Object value) {
        return redisTemplate.opsForSet().add(key, value);
    }
    
    // 集合操作 - 获取所有成员
    public Flux<Object> smembers(String key) {
        return redisTemplate.opsForSet().members(key);
    }
    
    // 有序集合操作 - 添加成员
    public Mono<Boolean> zadd(String key, Object value, double score) {
        return redisTemplate.opsForZSet().add(key, value, score);
    }
    
    // 有序集合操作 - 范围查询
    public Flux<Object> zrange(String key, long start, long end) {
        return redisTemplate.opsForZSet().range(key, start, end);
    }
    
    // 发布订阅 - 发布消息
    public Mono<Long> publish(String channel, String message) {
        return stringRedisTemplate.convertAndSend(channel, message);
    }
    
    // 发布订阅 - 订阅频道
    public Flux<String> subscribe(String channel) {
        return stringRedisTemplate.listenToChannel(channel)
            .map(message -> message.getMessage());
    }
}

响应式事务管理

  • 支持响应式事务(ReactiveTransactionManager)
  • 与传统事务管理器兼容
  • 支持编程式和声明式事务

响应式事务示例

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class TransactionService {
    
    private final UserRepository userRepository;
    private final OrderRepository orderRepository;
    private final TransactionalOperator rxtx;
    
    public TransactionService(
        UserRepository userRepository,
        OrderRepository orderRepository,
        TransactionalOperator rxtx
    ) {
        this.userRepository = userRepository;
        this.orderRepository = orderRepository;
        this.rxtx = rxtx;
    }
    
    // 声明式事务 - 使用 @Transactional 注解
    @Transactional
    public Mono<Void> createUserWithOrder(User user, Order order) {
        return userRepository.save(user)
            .flatMap(savedUser -> {
                order.setUserId(savedUser.getId());
                return orderRepository.save(order);
            })
            .then();
    }
    
    // 编程式事务 - 使用 TransactionalOperator
    public Mono<Void> transferMoney(Long fromUserId, Long toUserId, Double amount) {
        return userRepository.findById(fromUserId)
            .flatMap(fromUser -> userRepository.findById(toUserId)
                .flatMap(toUser -> {
                    // 扣款
                    fromUser.setBalance(fromUser.getBalance() - amount);
                    // 加款
                    toUser.setBalance(toUser.getBalance() + amount);
                    
                    // 保存两个用户
                    return userRepository.save(fromUser)
                        .then(userRepository.save(toUser));
                })
            )
            .as(rxtx::transactional) // 应用事务
            .then();
    }
    
    // 嵌套事务
    @Transactional
    public Mono<Void> complexOperation(User user, Order order, Payment payment) {
        return userRepository.save(user)
            .flatMap(savedUser -> {
                order.setUserId(savedUser.getId());
                return orderRepository.save(order);
            })
            .flatMap(savedOrder -> {
                payment.setOrderId(savedOrder.getId());
                return processPayment(payment);
            })
            .then();
    }
    
    @Transactional
    private Mono<Payment> processPayment(Payment payment) {
        // 处理支付逻辑
        return Mono.just(payment);
    }
    
    // 批量操作事务
    @Transactional
    public Flux<User> batchCreateUsers(Flux<User> users) {
        return users.flatMap(userRepository::save);
    }
}

4. 响应式安全特性

Spring Security 5.x 响应式支持

  • 完全响应式的认证和授权
  • 支持 JWT、OAuth2 等认证方式
  • 响应式方法级安全控制

Spring Security 配置示例

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.server.SecurityWebFilterChain;

@Configuration
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class SecurityConfig {
    
    @Bean
    public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
        return http
            .authorizeExchange(exchanges -> exchanges
                // 公开端点
                .pathMatchers("/api/public/**", "/auth/**").permitAll()
                // 管理员端点
                .pathMatchers("/api/admin/**").hasRole("ADMIN")
                // 用户端点
                .pathMatchers("/api/users/**").hasAnyRole("USER", "ADMIN")
                // 其他所有请求需要认证
                .anyExchange().authenticated()
            )
            // 禁用 CSRF(对于 REST API)
            .csrf(csrf -> csrf.disable())
            // 配置 CORS
            .cors(cors -> cors.disable())
            // 配置异常处理
            .exceptionHandling(exceptions -> exceptions
                .authenticationEntryPoint((exchange, ex) -> {
                    exchange.getResponse().setStatusCode(org.springframework.http.HttpStatus.UNAUTHORIZED);
                    return exchange.getResponse().setComplete();
                })
                .accessDeniedHandler((exchange, ex) -> {
                    exchange.getResponse().setStatusCode(org.springframework.http.HttpStatus.FORBIDDEN);
                    return exchange.getResponse().setComplete();
                })
            )
            .build();
    }
    
    @Bean
    public MapReactiveUserDetailsService userDetailsService() {
        // 在实际项目中,应该从数据库加载用户信息
        User.UserBuilder userBuilder = User.withPasswordEncoder(passwordEncoder());
        
        return new MapReactiveUserDetailsService(
            userBuilder
                .username("admin")
                .password(passwordEncoder().encode("admin123"))
                .roles("ADMIN")
                .build(),
            userBuilder
                .username("user")
                .password(passwordEncoder().encode("user123"))
                .roles("USER")
                .build()
        );
    }
    
    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder();
    }
}

JWT 认证示例

import org.springframework.security.authentication.ReactiveAuthenticationManager;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
public class JwtAuthenticationManager implements ReactiveAuthenticationManager {
    
    private final JwtTokenProvider jwtTokenProvider;
    
    public JwtAuthenticationManager(JwtTokenProvider jwtTokenProvider) {
        this.jwtTokenProvider = jwtTokenProvider;
    }
    
    @Override
    public Mono<Authentication> authenticate(Authentication authentication) {
        String authToken = authentication.getCredentials().toString();
        
        try {
            String username = jwtTokenProvider.getUsernameFromToken(authToken);
            
            if (username != null && jwtTokenProvider.validateToken(authToken)) {
                // 从 JWT 中提取权限信息
                var authorities = jwtTokenProvider.getAuthoritiesFromToken(authToken)
                    .stream()
                    .map(SimpleGrantedAuthority::new)
                    .toList();
                
                return Mono.just(new UsernamePasswordAuthenticationToken(
                    username,
                    null,
                    authorities
                ));
            }
        } catch (Exception e) {
            return Mono.error(e);
        }
        
        return Mono.empty();
    }
}

// JWT Token 提供者
@Component
public class JwtTokenProvider {
    
    private static final String SECRET_KEY = "your-secret-key";
    private static final long EXPIRATION_TIME = 86400000; // 24 小时
    
    // 生成 JWT Token
    public String generateToken(Authentication authentication) {
        String username = authentication.getName();
        var authorities = authentication.getAuthorities()
            .stream()
            .map(GrantedAuthority::getAuthority)
            .toList();
        
        // 这里应该使用 JWT 库(如 jjwt)生成 token
        // 简化示例
        return "jwt-token-for-" + username;
    }
    
    // 从 Token 中获取用户名
    public String getUsernameFromToken(String token) {
        // 实际项目中应该解析 JWT token
        if (token.startsWith("jwt-token-for-")) {
            return token.substring("jwt-token-for-".length());
        }
        return null;
    }
    
    // 验证 Token
    public boolean validateToken(String token) {
        // 实际项目中应该验证 JWT token 的签名和过期时间
        return token != null && token.startsWith("jwt-token-for-");
    }
    
    // 从 Token 中获取权限
    public List<String> getAuthoritiesFromToken(String token) {
        // 实际项目中应该从 JWT token 中解析权限
        return List.of("ROLE_USER");
    }
}

// JWT 认证过滤器
@Component
public class JwtAuthenticationFilter implements WebFilter {
    
    private final JwtAuthenticationManager authenticationManager;
    
    public JwtAuthenticationFilter(JwtAuthenticationManager authenticationManager) {
        this.authenticationManager = authenticationManager;
    }
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        String authHeader = exchange.getRequest().getHeaders().getFirst("Authorization");
        
        if (authHeader != null && authHeader.startsWith("Bearer ")) {
            String token = authHeader.substring(7);
            
            return authenticationManager.authenticate(
                new UsernamePasswordAuthenticationToken(null, token)
            ).flatMap(authentication -> {
                // 将认证信息存储到安全上下文中
                return chain.filter(exchange)
                    .contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
            }).switchIfEmpty(chain.filter(exchange));
        }
        
        return chain.filter(exchange);
    }
}

方法级安全示例

import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class SecureUserService {
    
    private final UserRepository userRepository;
    
    public SecureUserService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }
    
    // 只有 ADMIN 角色可以访问
    @PreAuthorize("hasRole('ADMIN')")
    public Flux<User> getAllUsers() {
        return userRepository.findAll();
    }
    
    // 只有 USER 或 ADMIN 角色可以访问
    @PreAuthorize("hasAnyRole('USER', 'ADMIN')")
    public Mono<User> getUserById(Long id) {
        return userRepository.findById(id);
    }
    
    // 用户只能访问自己的信息
    @PreAuthorize("#id == authentication.principal.id or hasRole('ADMIN')")
    public Mono<User> getOwnUser(Long id) {
        return userRepository.findById(id);
    }
    
    // 只有 ADMIN 可以创建用户
    @PreAuthorize("hasRole('ADMIN')")
    public Mono<User> createUser(User user) {
        return userRepository.save(user);
    }
    
    // 用户可以更新自己的信息
    @PreAuthorize("#user.id == authentication.principal.id or hasRole('ADMIN')")
    public Mono<User> updateUser(User user) {
        return userRepository.save(user);
    }
    
    // 只有 ADMIN 可以删除用户
    @PreAuthorize("hasRole('ADMIN')")
    public Mono<Void> deleteUser(Long id) {
        return userRepository.deleteById(id);
    }
}

5. 响应式测试支持

WebTestClient

  • 响应式 Web 应用测试工具
  • 支持对 WebFlux 应用进行集成测试
  • 提供流畅的 API 进行断言和验证

WebTestClient 示例

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.verify;

@WebFluxTest(UserController.class)
public class UserControllerTest {
    
    @Autowired
    private WebTestClient webTestClient;
    
    @MockBean
    private UserService userService;
    
    @Test
    public void testGetAllUsers() {
        // 准备测试数据
        User user1 = new User(1L, "Alice", "alice@example.com", 25);
        User user2 = new User(2L, "Bob", "bob@example.com", 30);
        
        given(userService.findAll()).willReturn(Flux.just(user1, user2));
        
        // 执行测试
        webTestClient.get()
            .uri("/api/users")
            .exchange()
            .expectStatus().isOk()
            .expectBodyList(User.class)
            .hasSize(2)
            .contains(user1, user2);
    }
    
    @Test
    public void testGetUserById() {
        User user = new User(1L, "Alice", "alice@example.com", 25);
        
        given(userService.findById(1L)).willReturn(Mono.just(user));
        
        webTestClient.get()
            .uri("/api/users/1")
            .exchange()
            .expectStatus().isOk()
            .expectBody(User.class)
            .isEqualTo(user);
    }
    
    @Test
    public void testGetUserByIdNotFound() {
        given(userService.findById(999L)).willReturn(Mono.empty());
        
        webTestClient.get()
            .uri("/api/users/999")
            .exchange()
            .expectStatus().isNotFound();
    }
    
    @Test
    public void testCreateUser() {
        User newUser = new User(null, "Charlie", "charlie@example.com", 28);
        User savedUser = new User(3L, "Charlie", "charlie@example.com", 28);
        
        given(userService.save(newUser)).willReturn(Mono.just(savedUser));
        
        webTestClient.post()
            .uri("/api/users")
            .bodyValue(newUser)
            .exchange()
            .expectStatus().isCreated()
            .expectBody(User.class)
            .isEqualTo(savedUser);
    }
    
    @Test
    public void testUpdateUser() {
        User updatedUser = new User(1L, "Alice Updated", "alice.new@example.com", 26);
        
        given(userService.update(1L, updatedUser)).willReturn(Mono.just(updatedUser));
        
        webTestClient.put()
            .uri("/api/users/1")
            .bodyValue(updatedUser)
            .exchange()
            .expectStatus().isOk()
            .expectBody(User.class)
            .isEqualTo(updatedUser);
    }
    
    @Test
    public void testDeleteUser() {
        given(userService.deleteById(1L)).willReturn(Mono.empty());
        
        webTestClient.delete()
            .uri("/api/users/1")
            .exchange()
            .expectStatus().isNoContent();
        
        verify(userService).deleteById(1L);
    }
    
    @Test
    public void testSearchUsers() {
        User user1 = new User(1L, "Alice", "alice@example.com", 25);
        User user2 = new User(2L, "Alice Smith", "alice.smith@example.com", 30);
        
        given(userService.search("Alice", null))
            .willReturn(Flux.just(user1, user2));
        
        webTestClient.get()
            .uri(uriBuilder -> uriBuilder
                .path("/api/users/search")
                .queryParam("name", "Alice")
                .build())
            .exchange()
            .expectStatus().isOk()
            .expectBodyList(User.class)
            .hasSize(2);
    }
}

StepVerifier

  • 用于测试 Reactor 流的测试工具
  • 支持对 Mono 和 Flux 进行精确测试
  • 提供时间虚拟化等高级测试功能

StepVerifier 示例

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.scheduler.VirtualTimeScheduler;

import java.time.Duration;

public class ReactorTest {
    
    @Test
    public void testMonoJust() {
        Mono<String> mono = Mono.just("Hello, Reactor!");
        
        StepVerifier.create(mono)
            .expectNext("Hello, Reactor!")
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testMonoError() {
        Mono<String> mono = Mono.error(new RuntimeException("Test error"));
        
        StepVerifier.create(mono)
            .expectError(RuntimeException.class)
            .verify();
    }
    
    @Test
    public void testFluxJust() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
        
        StepVerifier.create(flux)
            .expectNext(1)
            .expectNext(2)
            .expectNext(3)
            .expectNext(4)
            .expectNext(5)
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxRange() {
        Flux<Integer> flux = Flux.range(1, 10);
        
        StepVerifier.create(flux)
            .expectNextCount(10)
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxMap() {
        Flux<Integer> flux = Flux.range(1, 5)
            .map(i -> i * 2);
        
        StepVerifier.create(flux)
            .expectNext(2, 4, 6, 8, 10)
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxFilter() {
        Flux<Integer> flux = Flux.range(1, 10)
            .filter(i -> i % 2 == 0);
        
        StepVerifier.create(flux)
            .expectNext(2, 4, 6, 8, 10)
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxError() {
        Flux<String> flux = Flux.just("A", "B", "C")
            .map(s -> {
                if (s.equals("B")) {
                    throw new RuntimeException("Error on B");
                }
                return s;
            });
        
        StepVerifier.create(flux)
            .expectNext("A")
            .expectError(RuntimeException.class)
            .verify();
    }
    
    @Test
    public void testFluxOnErrorResume() {
        Flux<String> flux = Flux.just("A", "B", "C")
            .map(s -> {
                if (s.equals("B")) {
                    throw new RuntimeException("Error on B");
                }
                return s;
            })
            .onErrorResume(error -> Flux.just("Fallback"));
        
        StepVerifier.create(flux)
            .expectNext("A")
            .expectNext("Fallback")
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxCollectList() {
        Mono<List<Integer>> mono = Flux.range(1, 5)
            .collectList();
        
        StepVerifier.create(mono)
            .expectNextMatches(list -> list.size() == 5)
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxReduce() {
        Mono<Integer> mono = Flux.range(1, 5)
            .reduce((a, b) -> a + b);
        
        StepVerifier.create(mono)
            .expectNext(15)
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxBuffer() {
        Flux<List<Integer>> flux = Flux.range(1, 10)
            .buffer(3);
        
        StepVerifier.create(flux)
            .expectNextMatches(list -> list.equals(List.of(1, 2, 3)))
            .expectNextMatches(list -> list.equals(List.of(4, 5, 6)))
            .expectNextMatches(list -> list.equals(List.of(7, 8, 9)))
            .expectNextMatches(list -> list.equals(List.of(10)))
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxDelayElements() {
        // 使用虚拟时间进行测试
        VirtualTimeScheduler.getOrSet();
        
        try {
            Flux<Long> flux = Flux.interval(Duration.ofMillis(100))
                .take(3);
            
            StepVerifier.withVirtualTime(() -> flux)
                .expectSubscription()
                .expectNoEvent(Duration.ofMillis(100))
                .expectNext(0L)
                .expectNoEvent(Duration.ofMillis(100))
                .expectNext(1L)
                .expectNoEvent(Duration.ofMillis(100))
                .expectNext(2L)
                .expectComplete()
                .verify();
        } finally {
            VirtualTimeScheduler.reset();
        }
    }
    
    @Test
    public void testMonoTimeout() {
        Mono<String> mono = Mono.fromSupplier(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Delayed result";
        }).timeout(Duration.ofMillis(1000), Mono.just("Timeout fallback"));
        
        StepVerifier.create(mono)
            .expectNext("Timeout fallback")
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxMerge() {
        Flux<Integer> flux1 = Flux.range(1, 3);
        Flux<Integer> flux2 = Flux.range(4, 3);
        
        Flux<Integer> merged = Flux.merge(flux1, flux2);
        
        StepVerifier.create(merged)
            .expectNext(1, 2, 3, 4, 5, 6)
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxZip() {
        Flux<String> flux1 = Flux.just("A", "B", "C");
        Flux<Integer> flux2 = Flux.just(1, 2, 3);
        
        Flux<String> zipped = Flux.zip(flux1, flux2)
            .map(tuple -> tuple.getT1() + "-" + tuple.getT2());
        
        StepVerifier.create(zipped)
            .expectNext("A-1", "B-2", "C-3")
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxDistinct() {
        Flux<Integer> flux = Flux.just(1, 2, 2, 3, 3, 3, 4)
            .distinct();
        
        StepVerifier.create(flux)
            .expectNext(1, 2, 3, 4)
            .expectComplete()
            .verify();
    }
    
    @Test
    public void testFluxTakeAndSkip() {
        Flux<Integer> flux = Flux.range(1, 10)
            .skip(2)
            .take(5);
        
        StepVerifier.create(flux)
            .expectNext(3, 4, 5, 6, 7)
            .expectComplete()
            .verify();
    }
}

6. 响应式编程优势

资源利用率

  • 少量固定线程处理大量并发请求
  • 减少线程上下文切换开销
  • 提高系统吞吐量和可伸缩性

弹性和容错

  • 内置超时、重试、熔断等机制
  • 优雅的错误处理和降级策略
  • 更好的系统弹性

7. 响应式编程模型对比

graph TD
    A[传统阻塞模型] --> B[每个请求一个线程]
    C[响应式非阻塞模型] --> D[少量线程处理大量请求]
    
    B --> E[线程等待 I/O]
    B --> F[资源消耗大]
    B --> G[并发能力受限]
    
    D --> H[事件驱动]
    D --> I[资源消耗小]
    D --> J[高并发能力]

8. 响应式编程适用场景

  • 高并发、低延迟的应用
  • 流式数据处理应用
  • 实时通信应用(如聊天、推送)
  • 微服务架构中的服务间通信
  • I/O 密集型应用

9. 响应式高级特性

响应式 Kafka

  • 完全非阻塞的 Kafka 生产者和消费者
  • 支持背压和流控制
  • 与 Spring Boot 无缝集成

响应式 Kafka 配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.SenderOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class ReactiveKafkaConfig {
    
    @Bean
    public ReceiverOptions<String, String> kafkaReceiverOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        ReceiverOptions<String, String> options = ReceiverOptions.create(props);
        return options.subscription(Collections.singletonList("test-topic"));
    }
    
    @Bean
    public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate(
        ReceiverOptions<String, String> receiverOptions
    ) {
        return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }
    
    @Bean
    public SenderOptions<String, String> kafkaSenderOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        
        return SenderOptions.create(props);
    }
    
    @Bean
    public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate(
        SenderOptions<String, String> senderOptions
    ) {
        return new ReactiveKafkaProducerTemplate<>(senderOptions);
    }
}

响应式 Kafka 消费者

import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.ReceiverRecord;

@Service
public class KafkaConsumerService {
    
    private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate;
    
    public KafkaConsumerService(
        ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate
    ) {
        this.kafkaConsumerTemplate = kafkaConsumerTemplate;
    }
    
    // 消费消息
    public Flux<String> consumeMessages() {
        return kafkaConsumerTemplate.receive()
            .doOnNext(record -> {
                System.out.println("Received message: " + record.value());
                System.out.println("Partition: " + record.partition());
                System.out.println("Offset: " + record.offset());
            })
            .doOnError(error -> System.err.println("Error consuming message: " + error))
            .map(ReceiverRecord::value);
    }
    
    // 消费并处理消息
    public Flux<String> consumeAndProcessMessages() {
        return kafkaConsumerTemplate.receive()
            .flatMap(record -> {
                // 处理消息
                String processedMessage = processMessage(record.value());
                
                // 手动确认
                return record.receiverOffset()
                    .commit()
                    .thenReturn(processedMessage);
            })
            .doOnNext(message -> System.out.println("Processed: " + message))
            .doOnError(error -> System.err.println("Error processing message: " + error));
    }
    
    // 批量消费消息
    public Flux<List<String>> consumeMessagesBatch(int batchSize) {
        return kafkaConsumerTemplate.receive()
            .map(ReceiverRecord::value)
            .buffer(batchSize)
            .doOnNext(batch -> System.out.println("Processing batch of " + batch.size() + " messages"));
    }
    
    private String processMessage(String message) {
        // 模拟消息处理逻辑
        return "Processed: " + message.toUpperCase();
    }
}

响应式 Kafka 生产者

import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

@Service
public class KafkaProducerService {
    
    private final ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate;
    
    public KafkaProducerService(
        ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate
    ) {
        this.kafkaProducerTemplate = kafkaProducerTemplate;
    }
    
    // 发送单条消息
    public Mono<Void> sendMessage(String topic, String key, String value) {
        return kafkaProducerTemplate.send(topic, key, value)
            .doOnSuccess(result -> {
                System.out.println("Message sent successfully");
                System.out.println("Topic: " + result.recordMetadata().topic());
                System.out.println("Partition: " + result.recordMetadata().partition());
                System.out.println("Offset: " + result.recordMetadata().offset());
            })
            .doOnError(error -> System.err.println("Error sending message: " + error))
            .then();
    }
    
    // 发送消息并获取结果
    public Mono<SenderResult<String>> sendMessageWithResult(String topic, String key, String value) {
        return kafkaProducerTemplate.send(topic, key, value)
            .doOnSuccess(result -> System.out.println("Message sent: " + value))
            .doOnError(error -> System.err.println("Error sending message: " + error));
    }
    
    // 批量发送消息
    public Flux<SenderResult<String>> sendMessagesBatch(
        String topic,
        List<String> messages
    ) {
        return Flux.fromIterable(messages)
            .flatMap(message ->
                kafkaProducerTemplate.send(SenderRecord.create(
                    new org.apache.kafka.clients.producer.ProducerRecord<>(topic, null, message),
                    message
                ))
            )
            .doOnNext(result ->
                System.out.println("Sent message: " + result.correlationMetadata())
            )
            .doOnError(error ->
                System.err.println("Error sending batch: " + error)
            );
    }
    
    // 发送带有时间戳的消息
    public Mono<Void> sendMessageWithTimestamp(
        String topic,
        String key,
        String value,
        long timestamp
    ) {
        return kafkaProducerTemplate.send(
            new org.apache.kafka.clients.producer.ProducerRecord<>(
                topic,
                null,
                timestamp,
                key,
                value
            )
        ).then();
    }
    
    // 发送消息到指定分区
    public Mono<Void> sendMessageToPartition(
        String topic,
        int partition,
        String key,
        String value
    ) {
        return kafkaProducerTemplate.send(
            new org.apache.kafka.clients.producer.ProducerRecord<>(
                topic,
                partition,
                key,
                value
            )
        ).then();
    }
}

响应式 Kafka 监听器

import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.ReceiverRecord;

import jakarta.annotation.PostConstruct;

@Component
public class KafkaMessageListener {
    
    private final KafkaConsumerService kafkaConsumerService;
    private final KafkaProducerService kafkaProducerService;
    
    public KafkaMessageListener(
        KafkaConsumerService kafkaConsumerService,
        KafkaProducerService kafkaProducerService
    ) {
        this.kafkaConsumerService = kafkaConsumerService;
        this.kafkaProducerService = kafkaProducerService;
    }
    
    @PostConstruct
    public void startListening() {
        kafkaConsumerService.consumeAndProcessMessages()
            .subscribe(
                message -> System.out.println("Processed message: " + message),
                error -> System.err.println("Error in listener: " + error),
                () -> System.out.println("Listener completed")
            );
    }
    
    // 请求-响应模式
    public Flux<String> requestResponsePattern(String requestTopic, String responseTopic) {
        return kafkaConsumerService.consumeMessages()
            .flatMap(request -> {
                // 处理请求
                String response = "Response to: " + request;
                
                // 发送响应
                return kafkaProducerService.sendMessage(responseTopic, null, response)
                    .thenReturn(response);
            });
    }
}

响应式 WebSocket

  • 完全非阻塞的 WebSocket 通信
  • 支持双向实时通信
  • 与 Spring WebFlux 无缝集成

WebSocket 配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class WebSocketConfig {
    
    @Bean
    public HandlerMapping webSocketHandlerMapping(WebSocketHandler echoHandler) {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/echo", echoHandler);
        map.put("/ws/chat", chatHandler());
        
        SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
        handlerMapping.setOrder(1);
        handlerMapping.setUrlMap(map);
        return handlerMapping;
    }
    
    @Bean
    public WebSocketHandler echoHandler() {
        return session -> session.send(
            session.receive()
                .map(message -> {
                    // 回显消息
                    System.out.println("Received: " + message.getPayloadAsText());
                    return session.textMessage("Echo: " + message.getPayloadAsText());
                })
        );
    }
    
    @Bean
    public WebSocketHandler chatHandler() {
        return session -> {
            // 广播消息给所有连接的客户端
            return session.send(
                session.receive()
                    .map(message -> {
                        String payload = message.getPayloadAsText();
                        System.out.println("Chat message: " + payload);
                        return session.textMessage("Broadcast: " + payload);
                    })
            );
        };
    }
    
    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

WebSocket 服务

import org.springframework.stereotype.Service;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;

@Service
public class WebSocketService {
    
    private final Sinks.Many<String> messageSink = Sinks.many().multicast().onBackpressureBuffer();
    private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
    
    // 添加会话
    public void addSession(String sessionId, WebSocketSession session) {
        sessions.put(sessionId, session);
        System.out.println("Session added: " + sessionId);
    }
    
    // 移除会话
    public void removeSession(String sessionId) {
        sessions.remove(sessionId);
        System.out.println("Session removed: " + sessionId);
    }
    
    // 广播消息给所有会话
    public Mono<Void> broadcast(String message) {
        messageSink.tryEmitNext(message);
        
        return Flux.fromIterable(sessions.values())
            .flatMap(session ->
                session.send(Mono.just(session.textMessage(message)))
            )
            .then();
    }
    
    // 发送消息给指定会话
    public Mono<Void> sendToSession(String sessionId, String message) {
        WebSocketSession session = sessions.get(sessionId);
        if (session != null) {
            return session.send(Mono.just(session.textMessage(message)));
        }
        return Mono.empty();
    }
    
    // 发送心跳
    public Flux<String> heartbeat() {
        return Flux.interval(Duration.ofSeconds(30))
            .map(tick -> "Heartbeat: " + System.currentTimeMillis());
    }
    
    // 处理聊天消息
    public Mono<Void> handleChatMessage(WebSocketSession session, String message) {
        System.out.println("Chat message from " + session.getId() + ": " + message);
        
        // 广播消息给所有其他会话
        return Flux.fromIterable(sessions.values())
            .filter(s -> !s.getId().equals(session.getId()))
            .flatMap(s ->
                s.send(Mono.just(s.textMessage(
                    session.getId() + ": " + message
                )))
            )
            .then();
    }
}

WebSocket 处理器

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;

@Component
public class ChatWebSocketHandler implements WebSocketHandler {
    
    private final WebSocketService webSocketService;
    
    public ChatWebSocketHandler(WebSocketService webSocketService) {
        this.webSocketService = webSocketService;
    }
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String sessionId = session.getId();
        webSocketService.addSession(sessionId, session);
        
        return session.send(
            session.receive()
                .doOnNext(message -> {
                    String payload = message.getPayloadAsText();
                    System.out.println("Received from " + sessionId + ": " + payload);
                    
                    // 处理消息
                    webSocketService.handleChatMessage(session, payload).subscribe();
                })
                .map(message -> {
                    // 回显消息
                    return session.textMessage("You: " + message.getPayloadAsText());
                })
        ).doFinally(signalType -> {
            // 会话结束时清理
            webSocketService.removeSession(sessionId);
        });
    }
}

响应式 HTTP 客户端

  • 非阻塞的 HTTP 客户端
  • 支持流式响应
  • 与 WebClient 无缝集成

WebClient 配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

@Configuration
public class WebClientConfig {
    
    @Bean
    public WebClient webClient() {
        // 配置 HttpClient
        HttpClient httpClient = HttpClient.create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
            .responseTimeout(Duration.ofSeconds(5))
            .doOnConnected(conn ->
                conn.addHandlerLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS))
                    .addHandlerLast(new WriteTimeoutHandler(10, TimeUnit.SECONDS))
            );
        
        // 创建 WebClient
        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .baseUrl("https://api.example.com")
            .defaultHeader("Content-Type", "application/json")
            .build();
    }
    
    @Bean
    public WebClient webClientWithRetry() {
        HttpClient httpClient = HttpClient.create()
            .responseTimeout(Duration.ofSeconds(5));
        
        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .baseUrl("https://api.example.com")
            .build();
    }
}

WebClient 服务

import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.time.Duration;

@Service
public class ApiClientService {
    
    private final WebClient webClient;
    
    public ApiClientService(WebClient webClient) {
        this.webClient = webClient;
    }
    
    // GET 请求
    public Mono<User> getUser(Long id) {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono(User.class)
            .doOnSuccess(user -> System.out.println("User fetched: " + user.getName()))
            .doOnError(error -> System.err.println("Error fetching user: " + error));
    }
    
    // POST 请求
    public Mono<User> createUser(User user) {
        return webClient.post()
            .uri("/users")
            .bodyValue(user)
            .retrieve()
            .bodyToMono(User.class)
            .doOnSuccess(createdUser ->
                System.out.println("User created: " + createdUser.getId())
            );
    }
    
    // PUT 请求
    public Mono<User> updateUser(Long id, User user) {
        return webClient.put()
            .uri("/users/{id}", id)
            .bodyValue(user)
            .retrieve()
            .bodyToMono(User.class);
    }
    
    // DELETE 请求
    public Mono<Void> deleteUser(Long id) {
        return webClient.delete()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono(Void.class);
    }
    
    // 流式响应
    public Flux<User> getAllUsers() {
        return webClient.get()
            .uri("/users")
            .retrieve()
            .bodyToFlux(User.class)
            .doOnNext(user -> System.out.println("User: " + user.getName()));
    }
    
    // 带重试的请求
    public Mono<User> getUserWithRetry(Long id) {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono(User.class)
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
                .filter(throwable ->
                    throwable instanceof WebClientResponseException &&
                    ((WebClientResponseException) throwable).getStatusCode().is5xxServerError()
                )
            );
    }
    
    // 带超时的请求
    public Mono<User> getUserWithTimeout(Long id) {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono(User.class)
            .timeout(Duration.ofSeconds(3));
    }
    
    // 批量请求
    public Flux<User> batchGetUsers(List<Long> ids) {
        return Flux.fromIterable(ids)
            .flatMap(id ->
                webClient.get()
                    .uri("/users/{id}", id)
                    .retrieve()
                    .bodyToMono(User.class)
            );
    }
    
    // 并发请求
    public Mono<List<User>> concurrentGetUsers(List<Long> ids) {
        return Flux.fromIterable(ids)
            .flatMap(id ->
                webClient.get()
                    .uri("/users/{id}", id)
                    .retrieve()
                    .bodyToMono(User.class)
            )
            .collectList();
    }
    
    // 错误处理
    public Mono<User> getUserWithErrorHandling(Long id) {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .onStatus(
                status -> status.is4xxClientError(),
                response -> Mono.error(new RuntimeException("Client error"))
            )
            .onStatus(
                status -> status.is5xxServerError(),
                response -> Mono.error(new RuntimeException("Server error"))
            )
            .bodyToMono(User.class)
            .onErrorResume(WebClientResponseException.class, ex -> {
                System.err.println("HTTP Error: " + ex.getStatusCode());
                return Mono.empty();
            })
            .onErrorResume(Exception.class, ex -> {
                System.err.println("Error: " + ex.getMessage());
                return Mono.empty();
            });
    }
    
    // 文件上传
    public Mono<String> uploadFile(String fileName, byte[] fileContent) {
        return webClient.post()
            .uri("/upload")
            .bodyValue(fileContent)
            .retrieve()
            .bodyToMono(String.class);
    }
    
    // 文件下载
    public Mono<byte[]> downloadFile(String fileId) {
        return webClient.get()
            .uri("/files/{id}", fileId)
            .retrieve()
            .bodyToMono(byte[].class);
    }
}

响应式缓存

  • 非阻塞的缓存操作
  • 支持 Redis、Caffeine 等缓存实现
  • 与 Spring Cache 无缝集成

响应式缓存配置

import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.ReactiveRedisCacheConfiguration;
import org.springframework.data.redis.cache.ReactiveRedisCacheManager;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

@Configuration
@EnableCaching
public class ReactiveCacheConfig {
    
    @Bean
    public ReactiveRedisCacheManager cacheManager(
        ReactiveRedisConnectionFactory connectionFactory
    ) {
        RedisSerializationContext.SerializationPair<Object> jsonSerializer =
            RedisSerializationContext.SerializationPair.fromSerializer(
                new GenericJackson2JsonRedisSerializer()
            );
        
        return ReactiveRedisCacheManager.builder(connectionFactory)
            .cacheDefaults(
                ReactiveRedisCacheConfiguration.defaultCacheConfig()
                    .entryTtl(Duration.ofMinutes(10))
                    .disableCachingNullValues()
                    .serializeValuesWith(jsonSerializer)
            )
            .build();
    }
}

响应式缓存服务

import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class CacheService {
    
    private final UserRepository userRepository;
    
    public CacheService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }
    
    // 缓存读取
    @Cacheable(value = "users", key = "#id")
    public Mono<User> getUserById(Long id) {
        System.out.println("Fetching user from database: " + id);
        return userRepository.findById(id);
    }
    
    // 缓存更新
    @CachePut(value = "users", key = "#user.id")
    public Mono<User> updateUser(User user) {
        System.out.println("Updating user in database: " + user.getId());
        return userRepository.save(user);
    }
    
    // 缓存清除
    @CacheEvict(value = "users", key = "#id")
    public Mono<Void> deleteUser(Long id) {
        System.out.println("Deleting user from database: " + id);
        return userRepository.deleteById(id);
    }
    
    // 清除所有缓存
    @CacheEvict(value = "users", allEntries = true)
    public Mono<Void> clearAllCache() {
        System.out.println("Clearing all user cache");
        return Mono.empty();
    }
    
    // 批量缓存读取
    @Cacheable(value = "users", key = "'all'")
    public Flux<User> getAllUsers() {
        System.out.println("Fetching all users from database");
        return userRepository.findAll();
    }
    
    // 条件缓存
    @Cacheable(value = "users", key = "#id", unless = "#result == null")
    public Mono<User> getUserByIdConditional(Long id) {
        return userRepository.findById(id);
    }
}

Spring 的响应式编程特性提供了一套完整的工具链,使开发者能够构建高效、可伸缩的响应式应用程序,同时保持了 Spring 框架一贯的开发便利性。

10. 性能对比

传统阻塞模型 vs 响应式非阻塞模型

吞吐量对比

指标传统阻塞模型响应式非阻塞模型
单机并发连接数~200-500~10,000-100,000
请求响应时间100-500ms10-50ms
CPU 利用率30-50%(线程阻塞)70-90%(持续处理)
内存占用高(每个线程栈 ~1MB)低(事件循环栈 ~256KB)
线程上下文切换频繁极少

资源利用率对比

graph LR
    A[传统阻塞模型] --> B[线程池: 200-500 线程]
    C[响应式非阻塞模型] --> D[事件循环: 少量线程]
    
    B --> E[内存: 200-500MB]
    B --> F[CPU: 30-50%]
    B --> G[并发: 200-500]
    
    D --> H[内存: 10-50MB]
    D --> I[CPU: 70-90%]
    D --> J[并发: 10,000+]

性能基准测试示例

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@SpringBootApplication
public class PerformanceComparisonApp {
    public static void main(String[] args) {
        SpringApplication.run(PerformanceComparisonApp.class, args);
    }
}

@RestController
class PerformanceController {
    
    private final ExecutorService executorService = Executors.newFixedThreadPool(200);
    
    // 传统阻塞端点
    @GetMapping("/blocking")
    public Mono<String> blockingEndpoint() {
        return Mono.fromCallable(() -> {
            try {
                Thread.sleep(100); // 模拟 I/O 操作
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Blocking response";
        }).subscribeOn(Schedulers.boundedElastic());
    }
    
    // 响应式非阻塞端点
    @GetMapping("/reactive")
    public Mono<String> reactiveEndpoint() {
        return Mono.fromCallable(() -> {
            try {
                Thread.sleep(100); // 模拟 I/O 操作
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Reactive response";
        }).subscribeOn(Schedulers.boundedElastic());
    }
    
    // 批量处理对比
    @GetMapping("/batch-blocking")
    public Flux<String> batchBlocking() {
        return Flux.range(1, 100)
            .flatMap(i -> Mono.fromCallable(() -> {
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Item " + i;
            }).subscribeOn(Schedulers.boundedElastic()));
    }
    
    // 响应式批量处理
    @GetMapping("/batch-reactive")
    public Flux<String> batchReactive() {
        return Flux.range(1, 100)
            .flatMap(i -> Mono.fromCallable(() -> {
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Item " + i;
            }).subscribeOn(Schedulers.parallel()));
    }
}

性能优化建议

1. 线程池优化

import reactor.core.scheduler.Schedulers;
import java.util.concurrent.Executors;

// 使用自定义线程池
ExecutorService customExecutor = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors() * 2
);

// CPU 密集型任务使用 parallel
Flux.range(1, 1000)
    .parallel()
    .runOn(Schedulers.parallel())
    .map(i -> heavyComputation(i))
    .sequential()
    .subscribe();

// I/O 密集型任务使用 boundedElastic
Flux.range(1, 1000)
    .flatMap(i -> Mono.fromCallable(() -> ioOperation(i))
        .subscribeOn(Schedulers.boundedElastic())
    )
    .subscribe();

// 单线程执行使用 single
Flux.range(1, 1000)
    .publishOn(Schedulers.single())
    .map(i -> i * 2)
    .subscribe();

2. 连接池配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import java.time.Duration;

@Configuration
public class WebClientPoolConfig {
    
    @Bean
    public WebClient optimizedWebClient() {
        // 配置连接池
        ConnectionProvider provider = ConnectionProvider.builder("custom")
            .maxConnections(500) // 最大连接数
            .maxIdleTime(Duration.ofSeconds(20)) // 最大空闲时间
            .maxLifeTime(Duration.ofSeconds(60)) // 连接最大生命周期
            .pendingAcquireTimeout(Duration.ofSeconds(60)) // 获取连接超时
            .evictInBackground(Duration.ofSeconds(120)) // 后台清理间隔
            .build();
        
        HttpClient httpClient = HttpClient.create(provider)
            .responseTimeout(Duration.ofSeconds(5))
            .keepAlive(true);
        
        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .baseUrl("https://api.example.com")
            .build();
    }
}

3. 数据库连接优化

import io.r2dbc.spi.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;

@Configuration
public class R2dbcPoolConfig extends AbstractR2dbcConfiguration {
    
    @Override
    protected String connectionUrl() {
        // 配置连接池参数
        return "r2dbc:postgresql://localhost:5432/reactor_db" +
            "?poolSize=50" + // 连接池大小
            "&maxIdleTime=PT30S" + // 最大空闲时间
            "&maxLifeTime=PT10M" + // 连接最大生命周期
            "&acquireRetryAttempts=3"; // 获取连接重试次数
    }
    
    @Override
    protected String username() {
        return "postgres";
    }
    
    @Override
    protected String password() {
        return "password";
    }
}

11. 最佳实践

1. 错误处理最佳实践

错误恢复策略

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;

public class ErrorHandlingBestPractices {
    
    // 1. 使用 onErrorResume 提供降级方案
    public Mono<String> getUserWithFallback(Long id) {
        return userRepository.findById(id)
            .onErrorResume(UserNotFoundException.class, ex -> {
                // 特定异常的降级处理
                return Mono.just("Default User");
            })
            .onErrorResume(Exception.class, ex -> {
                // 通用异常的降级处理
                return Mono.just("Error: " + ex.getMessage());
            });
    }
    
    // 2. 使用 retry 进行智能重试
    public Mono<String> getUserWithRetry(Long id) {
        return userRepository.findById(id)
            .retryWhen(Retry.backoff(3, Duration.ofMillis(100))
                .filter(throwable ->
                    throwable instanceof TimeoutException ||
                    throwable instanceof NetworkException
                )
                .doBeforeRetry(signal ->
                    System.out.println("Retry attempt: " + signal.totalRetries())
                )
            );
    }
    
    // 3. 使用 onErrorReturn 返回默认值
    public Flux<String> getAllUsersWithDefault() {
        return userRepository.findAll()
            .onErrorReturn("Error", "User1", "User2");
    }
    
    // 4. 使用 doOnError 记录错误
    public Mono<User> getUserWithLogging(Long id) {
        return userRepository.findById(id)
            .doOnError(error ->
                logger.error("Failed to fetch user {}: {}", id, error.getMessage())
            );
    }
    
    // 5. 使用 switchIfEmpty 处理空结果
    public Mono<User> getUserOrThrow(Long id) {
        return userRepository.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException("User not found")));
    }
}

2. 背压处理最佳实践

import reactor.core.publisher.Flux;
import java.time.Duration;

public class BackpressureBestPractices {
    
    // 1. 使用 onBackpressureBuffer 适用于可以缓冲的场景
    public Flux<String> processWithBuffer() {
        return Flux.interval(Duration.ofMillis(10))
            .onBackpressureBuffer(100) // 缓冲 100 个元素
            .flatMap(this::processItem)
            .subscribeOn(Schedulers.boundedElastic());
    }
    
    // 2. 使用 onBackpressureDrop 适用于可以丢失数据的场景
    public Flux<String> processWithDrop() {
        return Flux.interval(Duration.ofMillis(10))
            .onBackpressureDrop(dropped ->
                logger.warn("Dropped item: {}", dropped)
            )
            .flatMap(this::processItem);
    }
    
    // 3. 使用 onBackpressureLatest 适用于只需要最新值的场景
    public Flux<String> processWithLatest() {
        return Flux.interval(Duration.ofMillis(10))
            .onBackpressureLatest()
            .flatMap(this::processItem);
    }
    
    // 4. 使用 limitRate 控制处理速率
    public Flux<String> processWithRateLimit() {
        return Flux.interval(Duration.ofMillis(10))
            .limitRate(100) // 每秒最多处理 100 个元素
            .flatMap(this::processItem);
    }
    
    // 5. 使用 sample 降频处理
    public Flux<String> processWithSampling() {
        return Flux.interval(Duration.ofMillis(10))
            .sample(Duration.ofMillis(100)) // 每 100ms 取一个样本
            .flatMap(this::processItem);
    }
}

3. 资源管理最佳实践

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import java.sql.Connection;
import java.sql.DriverManager;

public class ResourceManagementBestPractices {
    
    // 1. 使用 using 确保资源释放
    public Flux<String> readFileWithResourceManagement(String filePath) {
        return Mono.using(
            // 资源提供者
            () -> {
                try {
                    return DriverManager.getConnection("jdbc:postgresql://localhost:5432/db");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            },
            // 资源消费者
            connection -> {
                // 使用连接执行操作
                return Mono.fromCallable(() -> {
                    // 执行查询
                    return "Data from database";
                }).subscribeOn(Schedulers.boundedElastic());
            },
            // 资源清理
            connection -> {
                try {
                    if (connection != null && !connection.isClosed()) {
                        connection.close();
                    }
                } catch (Exception e) {
                    logger.error("Error closing connection", e);
                }
            }
        );
    }
    
    // 2. 使用 doFinally 确保清理
    public Mono<Void> processWithCleanup() {
        return Flux.range(1, 10)
            .flatMap(i -> processItem(i))
            .doFinally(signalType -> {
                // 无论成功或失败都执行清理
                logger.info("Processing completed with signal: {}", signalType);
                cleanupResources();
            })
            .then();
    }
    
    // 3. 使用 doOnCancel 处理取消
    public Flux<String> processWithCancelHandling() {
        return Flux.interval(Duration.ofMillis(100))
            .doOnCancel(() -> {
                logger.info("Stream was cancelled");
                cleanupResources();
            })
            .flatMap(this::processItem);
    }
    
    // 4. 使用 doOnDiscard 处理丢弃的元素
    public Flux<String> processWithDiscardHandling() {
        return Flux.interval(Duration.ofMillis(100))
            .onBackpressureDrop(dropped ->
                logger.warn("Item was dropped: {}", dropped)
            )
            .doOnDiscard(discarded ->
                logger.info("Item was discarded: {}", discarded)
            );
    }
    
    private void cleanupResources() {
        // 清理资源的逻辑
    }
}

4. 性能优化最佳实践

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Schedulers;
import java.time.Duration;

public class PerformanceOptimizationBestPractices {
    
    // 1. 使用 parallel 进行并行处理
    public Flux<Integer> parallelProcessing() {
        return Flux.range(1, 1000)
            .parallel() // 启用并行处理
            .runOn(Schedulers.parallel()) // 在并行调度器上执行
            .map(this::heavyComputation)
            .sequential(); // 按顺序收集结果
    }
    
    // 2. 使用 flatMap 并发处理
    public Flux<String> concurrentProcessing() {
        return Flux.range(1, 100)
            .flatMap(i ->
                Mono.fromCallable(() -> processItem(i))
                    .subscribeOn(Schedulers.boundedElastic())
            ); // 默认并发度为 256
    }
    
    // 3. 使用 flatMapSequential 保持顺序
    public Flux<String> orderedConcurrentProcessing() {
        return Flux.range(1, 100)
            .flatMapSequential(i ->
                Mono.fromCallable(() -> processItem(i))
                    .subscribeOn(Schedulers.boundedElastic())
            ); // 保持顺序的并发处理
    }
    
    // 4. 使用 cache 缓存结果
    public Mono<String> getCachedData(String key) {
        return fetchDataFromDatabase(key)
            .cache(); // 缓存结果,多次订阅只执行一次
    }
    
    // 5. 使用 cacheInvalidateWith 缓存带失效
    public Flux<String> getStreamWithCache(Duration ttl) {
        return Flux.interval(Duration.ofSeconds(1))
            .cacheInvalidateWith(Duration.ofMinutes(5), () -> {
                // 每 5 分钟失效缓存
                return Flux.interval(Duration.ofSeconds(1));
            });
    }
    
    // 6. 使用 delayElements 控制发射速率
    public Flux<Long> rateLimitedStream() {
        return Flux.interval(Duration.ofMillis(10))
            .delayElements(Duration.ofMillis(50)); // 限制发射速率为 20 个/秒
    }
    
    // 7. 使用 take 避免无限流
    public Flux<Long> limitedStream() {
        return Flux.interval(Duration.ofMillis(100))
            .take(100); // 只取前 100 个元素
    }
    
    // 8. 使用 timeout 避免长时间阻塞
    public Mono<String> withTimeout() {
        return Mono.fromCallable(() -> {
            Thread.sleep(5000);
            return "Result";
        }).timeout(Duration.ofSeconds(3), Mono.just("Timeout"));
    }
}

5. 测试最佳实践

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.scheduler.VirtualTimeScheduler;

import java.time.Duration;

public class TestingBestPractices {
    
    // 1. 使用 StepVerifier 进行精确测试
    @Test
    public void testMonoWithStepVerifier() {
        Mono<String> mono = Mono.just("Test");
        
        StepVerifier.create(mono)
            .expectNext("Test")
            .expectComplete()
            .verify();
    }
    
    // 2. 使用虚拟时间测试时间相关操作
    @Test
    public void testWithVirtualTime() {
        VirtualTimeScheduler.getOrSet();
        
        try {
            Flux<Long> flux = Flux.interval(Duration.ofSeconds(1))
                .take(3);
            
            StepVerifier.withVirtualTime(() -> flux)
                .expectSubscription()
                .expectNoEvent(Duration.ofSeconds(1))
                .expectNext(0L)
                .expectNoEvent(Duration.ofSeconds(1))
                .expectNext(1L)
                .expectNoEvent(Duration.ofSeconds(1))
                .expectNext(2L)
                .expectComplete()
                .verify();
        } finally {
            VirtualTimeScheduler.reset();
        }
    }
    
    // 3. 测试错误场景
    @Test
    public void testErrorHandling() {
        Mono<String> mono = Mono.error(new RuntimeException("Test error"));
        
        StepVerifier.create(mono)
            .expectError(RuntimeException.class)
            .verify();
    }
    
    // 4. 测试背压
    @Test
    public void testBackpressure() {
        Flux<Integer> flux = Flux.range(1, 1000);
        
        StepVerifier.create(flux, 10) // 请求 10 个元素
            .expectNextCount(10)
            .expectComplete()
            .verify();
    }
}

6. 监控和日志最佳实践

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;

public class MonitoringAndLoggingBestPractices {
    
    private static final Logger logger = LoggerFactory.getLogger(MonitoringAndLoggingBestPractices.class);
    
    // 1. 使用 doOnNext 记录处理进度
    public Flux<String> processWithProgressLogging() {
        return Flux.range(1, 100)
            .doOnNext(i -> {
                if (i % 10 == 0) {
                    logger.info("Processed {} items", i);
                }
            })
            .flatMap(this::processItem);
    }
    
    // 2. 使用 doOnComplete 记录完成状态
    public Mono<Void> processWithCompletionLogging() {
        return Flux.range(1, 100)
            .flatMap(this::processItem)
            .doOnComplete(() -> {
                logger.info("Processing completed successfully");
            })
            .then();
    }
    
    // 3. 使用 doOnError 记录错误
    public Mono<String> processWithErrorLogging() {
        return Mono.fromCallable(() -> {
            throw new RuntimeException("Test error");
        }).doOnError(error -> {
            logger.error("Processing failed", error);
        });
    }
    
    // 4. 使用 contextWrite 添加上下文信息
    public Mono<String> processWithContext() {
        return Mono.fromCallable(() -> "Result")
            .contextWrite(context -> context.put("requestId", "12345"))
            .doOnSubscribe(subscription -> {
                String requestId = subscription.currentContext().get("requestId");
                logger.info("Processing request: {}", requestId);
            });
    }
    
    // 5. 使用 metrics 收集性能指标
    public Flux<String> processWithMetrics() {
        long startTime = System.currentTimeMillis();
        
        return Flux.range(1, 100)
            .flatMap(this::processItem)
            .doOnComplete(() -> {
                long duration = System.currentTimeMillis() - startTime;
                logger.info("Processed 100 items in {} ms", duration);
            });
    }
}

7. 安全最佳实践

import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class SecurityBestPractices {
    
    // 1. 使用 @PreAuthorize 进行方法级安全控制
    @PreAuthorize("hasRole('ADMIN')")
    public Flux<User> getAllUsers() {
        return userRepository.findAll();
    }
    
    // 2. 使用参数化查询防止 SQL 注入
    public Mono<User> findByUsername(String username) {
        // 使用参数化查询而不是字符串拼接
        return userRepository.findByUsername(username);
    }
    
    // 3. 验证输入参数
    public Mono<User> createUser(User user) {
        // 验证用户输入
        if (user.getUsername() == null || user.getUsername().isEmpty()) {
            return Mono.error(new ValidationException("Username is required"));
        }
        
        if (user.getEmail() == null || !isValidEmail(user.getEmail())) {
            return Mono.error(new ValidationException("Invalid email format"));
        }
        
        return userRepository.save(user);
    }
    
    // 4. 使用 switchIfEmpty 防止信息泄露
    public Mono<User> getUserById(Long id) {
        return userRepository.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException("User not found")));
    }
    
    // 5. 限制返回数据量
    public Flux<User> getUsers(int page, int size) {
        // 限制返回的数据量,防止大量数据传输
        if (size > 100) {
            size = 100;
        }
        
        return userRepository.findAll()
            .skip(page * size)
            .take(size);
    }
    
    private boolean isValidEmail(String email) {
        // 邮箱验证逻辑
        return email != null && email.matches("^[A-Za-z0-9+_.-]+@.*\\.[A-Za-z]{2,}$");
    }
}

8. 响应式编程常见陷阱

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Schedulers;
import java.time.Duration;

public class CommonPitfalls {
    
    // ❌ 错误:在 subscribe 中阻塞
    public void wrongBlockingInSubscribe() {
        Flux.range(1, 10)
            .subscribe(i -> {
                try {
                    Thread.sleep(1000); // 阻塞操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    }
    
    // ✅ 正确:使用 subscribeOn 在单独线程执行
    public void correctNonBlockingSubscribe() {
        Flux.range(1, 10)
            .flatMap(i -> Mono.fromCallable(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return i;
            }).subscribeOn(Schedulers.boundedElastic()))
            .subscribe();
    }
    
    // ❌ 错误:忘记订阅
    public void wrongForgettingToSubscribe() {
        Mono.just("Hello")
            .map(String::toUpperCase); // 没有订阅,不会执行
    }
    
    // ✅ 正确:记得订阅
    public void correctWithSubscribe() {
        Mono.just("Hello")
            .map(String::toUpperCase)
            .subscribe(System.out::println);
    }
    
    // ❌ 错误:在响应式流中使用阻塞操作
    public Flux<String> wrongBlockingInStream() {
        return Flux.range(1, 10)
            .map(i -> {
                try {
                    Thread.sleep(1000); // 阻塞操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Item " + i;
            });
    }
    
    // ✅ 正确:使用异步操作
    public Flux<String> correctAsyncInStream() {
        return Flux.range(1, 10)
            .flatMap(i -> Mono.fromCallable(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Item " + i;
            }).subscribeOn(Schedulers.boundedElastic()));
    }
    
    // ❌ 错误:过度使用 flatMap 导致并发过高
    public Flux<String> wrongExcessiveFlatMap() {
        return Flux.range(1, 10000)
            .flatMap(i -> Mono.fromCallable(() -> processItem(i))); // 并发度可能过高
    }
    
    // ✅ 正确:控制并发度
    public Flux<String> correctControlledConcurrency() {
        return Flux.range(1, 10000)
            .flatMap(i ->
                Mono.fromCallable(() -> processItem(i))
                    .subscribeOn(Schedulers.boundedElastic()),
                10 // 限制并发度为 10
            );
    }
    
    // ❌ 错误:忽略错误处理
    public Flux<String> wrongIgnoringErrors() {
        return Flux.range(1, 10)
            .map(i -> {
                if (i == 5) {
                    throw new RuntimeException("Error");
                }
                return "Item " + i;
            }); // 没有错误处理,流会终止
    }
    
    // ✅ 正确:添加错误处理
    public Flux<String> correctWithErrorHandling() {
        return Flux.range(1, 10)
            .map(i -> {
                if (i == 5) {
                    throw new RuntimeException("Error");
                }
                return "Item " + i;
            })
            .onErrorResume(error -> Flux.just("Error: " + error.getMessage()));
    }
}

9. 响应式编程设计模式

观察者模式(Observer Pattern)

import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class ObserverPattern {
    
    private final Sinks.Many<String> eventSink = Sinks.many().multicast().onBackpressureBuffer();
    
    // 事件发布者
    public void publishEvent(String event) {
        eventSink.tryEmitNext(event);
    }
    
    // 事件订阅者
    public Flux<String> subscribeToEvents() {
        return eventSink.asFlux();
    }
}

策略模式(Strategy Pattern)

import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;

public class StrategyPattern {
    
    public interface ProcessingStrategy {
        Mono<String> process(String input);
    }
    
    public static class UppercaseStrategy implements ProcessingStrategy {
        @Override
        public Mono<String> process(String input) {
            return Mono.just(input.toUpperCase());
        }
    }
    
    public static class LowercaseStrategy implements ProcessingStrategy {
        @Override
        public Mono<String> process(String input) {
            return Mono.just(input.toLowerCase());
        }
    }
    
    public static class ReverseStrategy implements ProcessingStrategy {
        @Override
        public Mono<String> process(String input) {
            return Mono.just(new StringBuilder(input).reverse().toString());
        }
    }
    
    // 策略工厂
    public static ProcessingStrategy getStrategy(String type) {
        switch (type) {
            case "uppercase":
                return new UppercaseStrategy();
            case "lowercase":
                return new LowercaseStrategy();
            case "reverse":
                return new ReverseStrategy();
            default:
                throw new IllegalArgumentException("Unknown strategy: " + type);
        }
    }
}

责任链模式(Chain of Responsibility)

import reactor.core.publisher.Mono;

public class ChainOfResponsibility {
    
    public interface Handler {
        Mono<String> handle(String request);
    }
    
    public static class AuthenticationHandler implements Handler {
        private final Handler next;
        
        public AuthenticationHandler(Handler next) {
            this.next = next;
        }
        
        @Override
        public Mono<String> handle(String request) {
            if (request.startsWith("auth:")) {
                return Mono.just("Authenticated: " + request.substring(5));
            }
            return next.handle(request);
        }
    }
    
    public static class AuthorizationHandler implements Handler {
        private final Handler next;
        
        public AuthorizationHandler(Handler next) {
            this.next = next;
        }
        
        @Override
        public Mono<String> handle(String request) {
            if (request.startsWith("auth:admin")) {
                return Mono.just("Authorized: " + request);
            }
            return next.handle(request);
        }
    }
    
    public static class LoggingHandler implements Handler {
        @Override
        public Mono<String> handle(String request) {
            System.out.println("Logging request: " + request);
            return Mono.just("Logged: " + request);
        }
    }
    
    // 构建责任链
    public static Handler buildChain() {
        Handler loggingHandler = new LoggingHandler();
        Handler authorizationHandler = new AuthorizationHandler(loggingHandler);
        Handler authenticationHandler = new AuthenticationHandler(authorizationHandler);
        
        return authenticationHandler;
    }
}

通过遵循这些最佳实践,可以构建高性能、可维护、可靠的响应式应用程序。响应式编程虽然有一定的学习曲线,但掌握这些技巧后,能够充分发挥其优势,构建出卓越的应用系统。

12. 常见问题和解决方案

12.1 基础问题

Q1: 为什么我的响应式代码没有执行?

问题描述:创建了 Mono 或 Flux,但代码没有执行任何操作。

原因:响应式流是懒加载的,只有在订阅时才会开始执行。

解决方案

// ❌ 错误:没有订阅
Mono.just("Hello")
    .map(String::toUpperCase);

// ✅ 正确:添加订阅
Mono.just("Hello")
    .map(String::toUpperCase)
    .subscribe(System.out::println);

// ✅ 正确:在 WebFlux 中,框架会自动订阅
@GetMapping("/hello")
public Mono<String> hello() {
    return Mono.just("Hello")
        .map(String::toUpperCase);
}

Q2: 如何在响应式流中处理阻塞操作?

问题描述:在响应式流中调用阻塞方法导致整个应用被阻塞。

原因:响应式流运行在事件循环线程上,阻塞操作会阻塞整个线程。

解决方案

// ❌ 错误:在响应式流中直接阻塞
Flux.range(1, 10)
    .map(i -> {
        try {
            Thread.sleep(1000); // 阻塞操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    });

// ✅ 正确:使用 subscribeOn 在单独线程执行
Flux.range(1, 10)
    .flatMap(i -> Mono.fromCallable(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }).subscribeOn(Schedulers.boundedElastic()));

// ✅ 正确:对于数据库操作,使用响应式驱动
@Repository
public interface UserRepository extends R2dbcRepository<User, Long> {
    // 响应式数据库操作,不需要手动处理阻塞
}

// ✅ 正确:对于必须使用阻塞 API 的情况
public Mono<String> callBlockingApi() {
    return Mono.fromCallable(() -> {
        // 调用阻塞 API
        return blockingService.call();
    }).subscribeOn(Schedulers.boundedElastic());
}

Q3: 为什么我的 Mono/Flux 只执行一次?

问题描述:多次订阅同一个 Mono/Flux,但只执行了一次。

原因:默认情况下,Mono/Flux 是冷序列,每次订阅都会重新执行。但如果使用了 cache 或其他操作,可能会改变行为。

解决方案

// 冷序列:每次订阅都重新执行
Mono<String> coldMono = Mono.fromCallable(() -> {
    System.out.println("Executing...");
    return "Result";
});

coldMono.subscribe(); // 输出: Executing...
coldMono.subscribe(); // 输出: Executing...

// 热序列:缓存结果,多次订阅只执行一次
Mono<String> hotMono = Mono.fromCallable(() -> {
    System.out.println("Executing...");
    return "Result";
}).cache();

hotMono.subscribe(); // 输出: Executing...
hotMono.subscribe(); // 没有输出

// 如果需要每次订阅都执行,不要使用 cache
// 如果需要缓存结果,使用 cache()

12.2 错误处理问题

Q4: 如何处理响应式流中的异常?

问题描述:响应式流中的异常导致整个流终止,无法继续处理。

解决方案

// 方案1: 使用 onErrorResume 提供降级方案
public Mono<String> getUserWithFallback(Long id) {
    return userRepository.findById(id)
        .onErrorResume(UserNotFoundException.class, ex -> {
            // 特定异常的处理
            return Mono.just("Default User");
        })
        .onErrorResume(Exception.class, ex -> {
            // 通用异常的处理
            return Mono.just("Error: " + ex.getMessage());
        });
}

// 方案2: 使用 onErrorReturn 返回默认值
public Flux<String> getAllUsersWithDefault() {
    return userRepository.findAll()
        .map(User::getName)
        .onErrorReturn("Error", "Unknown User");
}

// 方案3: 使用 retry 进行重试
public Mono<User> getUserWithRetry(Long id) {
    return userRepository.findById(id)
        .retryWhen(Retry.backoff(3, Duration.ofMillis(100))
            .filter(throwable ->
                throwable instanceof TimeoutException ||
                throwable instanceof NetworkException
            )
        );
}

// 方案4: 使用 onErrorContinue 继续处理后续元素
public Flux<String> processWithErrorContinue() {
    return Flux.range(1, 10)
        .flatMap(i -> processItem(i))
        .onErrorContinue((error, item) -> {
            logger.error("Error processing item {}: {}", item, error.getMessage());
        });
}

Q5: 如何捕获并记录所有错误?

问题描述:需要记录响应式流中发生的所有错误,包括中间步骤的错误。

解决方案

public Mono<Void> processWithFullErrorLogging() {
    return Flux.range(1, 10)
        .doOnNext(i -> logger.info("Processing item: {}", i))
        .flatMap(i -> processItem(i)
            .doOnError(error ->
                logger.error("Error processing item {}: {}", i, error.getMessage())
            )
        )
        .doOnError(error ->
            logger.error("Stream processing failed: {}", error.getMessage())
        )
        .doOnComplete(() ->
            logger.info("Stream processing completed successfully")
        )
        .then();
}

// 使用 context 传递请求 ID 以便跟踪
public Mono<String> processWithContext() {
    return Mono.fromCallable(() -> "Result")
        .contextWrite(context -> context.put("requestId", UUID.randomUUID().toString()))
        .doOnError(error -> {
            String requestId = error.getSuppressed()[0].getMessage();
            logger.error("Request {} failed: {}", requestId, error.getMessage());
        });
}

12.3 背压问题

Q6: 如何处理快速生产者和慢速消费者?

问题描述:生产者产生数据的速度快于消费者处理速度,导致内存溢出或系统崩溃。

解决方案

// 方案1: 使用 onBackpressureBuffer 缓冲数据
public Flux<String> processWithBuffer() {
    return Flux.interval(Duration.ofMillis(10))
        .onBackpressureBuffer(100) // 缓冲最多 100 个元素
        .flatMap(this::processItem)
        .subscribeOn(Schedulers.boundedElastic());
}

// 方案2: 使用 onBackpressureDrop 丢弃多余数据
public Flux<String> processWithDrop() {
    return Flux.interval(Duration.ofMillis(10))
        .onBackpressureDrop(dropped ->
            logger.warn("Dropped item: {}", dropped)
        )
        .flatMap(this::processItem);
}

// 方案3: 使用 onBackpressureLatest 保留最新值
public Flux<String> processWithLatest() {
    return Flux.interval(Duration.ofMillis(10))
        .onBackpressureLatest()
        .flatMap(this::processItem);
}

// 方案4: 使用 limitRate 控制处理速率
public Flux<String> processWithRateLimit() {
    return Flux.interval(Duration.ofMillis(10))
        .limitRate(100) // 每秒最多处理 100 个元素
        .flatMap(this::processItem);
}

// 方案5: 使用 sample 降频处理
public Flux<String> processWithSampling() {
    return Flux.interval(Duration.ofMillis(10))
        .sample(Duration.ofMillis(100)) // 每 100ms 取一个样本
        .flatMap(this::processItem);
}

Q7: 如何调试背压问题?

问题描述:不确定是否发生了背压问题,需要调试。

解决方案

public Flux<String> debugBackpressure() {
    return Flux.interval(Duration.ofMillis(10))
        .doOnRequest(request -> {
            logger.info("Requested {} items", request);
        })
        .doOnNext(item -> {
            logger.info("Emitted item: {}", item);
        })
        .doOnCancel(() -> {
            logger.warn("Stream was cancelled");
        })
        .doOnComplete(() -> {
            logger.info("Stream completed");
        })
        .doOnError(error -> {
            logger.error("Stream error: {}", error.getMessage());
        })
        .flatMap(this::processItem);
}

// 使用 Reactor Debug 模式
// 启动时添加 JVM 参数: -Dreactor.traceback=true

12.4 性能问题

Q8: 如何提高响应式应用的性能?

问题描述:响应式应用性能不如预期,需要优化。

解决方案

// 1. 使用并行处理
public Flux<Integer> parallelProcessing() {
    return Flux.range(1, 1000)
        .parallel()
        .runOn(Schedulers.parallel())
        .map(this::heavyComputation)
        .sequential();
}

// 2. 控制并发度
public Flux<String> controlledConcurrency() {
    return Flux.range(1, 100)
        .flatMap(i ->
            Mono.fromCallable(() -> processItem(i))
                .subscribeOn(Schedulers.boundedElastic()),
            10 // 限制并发度为 10
        );
}

// 3. 使用缓存
public Mono<String> getCachedData(String key) {
    return fetchDataFromDatabase(key)
        .cache(); // 缓存结果
}

// 4. 使用连接池
@Bean
public WebClient optimizedWebClient() {
    ConnectionProvider provider = ConnectionProvider.builder("custom")
        .maxConnections(500)
        .maxIdleTime(Duration.ofSeconds(20))
        .build();
    
    HttpClient httpClient = HttpClient.create(provider);
    
    return WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(httpClient))
        .baseUrl("https://api.example.com")
        .build();
}

// 5. 避免不必要的操作
public Flux<String> optimizedStream() {
    return Flux.range(1, 1000)
        .filter(i -> i % 2 == 0) // 先过滤再处理
        .map(this::processItem);
}

Q9: 如何避免内存泄漏?

问题描述:响应式应用出现内存泄漏,内存占用持续增长。

解决方案

// 1. 使用 take 避免无限流
public Flux<Long> limitedStream() {
    return Flux.interval(Duration.ofMillis(100))
        .take(100); // 只取前 100 个元素
}

// 2. 使用 timeout 避免长时间阻塞
public Mono<String> withTimeout() {
    return Mono.fromCallable(() -> {
        Thread.sleep(5000);
        return "Result";
    }).timeout(Duration.ofSeconds(3), Mono.just("Timeout"));
}

// 3. 正确处理资源释放
public Flux<String> processWithResourceCleanup() {
    return Mono.using(
        () -> acquireResource(),
        resource -> processWithResource(resource),
        this::releaseResource
    );
}

// 4. 使用 doFinally 确保清理
public Mono<Void> processWithCleanup() {
    return Flux.range(1, 10)
        .flatMap(this::processItem)
        .doFinally(signalType -> {
            cleanupResources();
        })
        .then();
}

// 5. 避免在 doOnNext/doOnError 中持有引用
public Flux<String> avoidMemoryLeak() {
    return Flux.range(1, 1000)
        .flatMap(i -> processItem(i))
        .doOnNext(item -> {
            // 不要在这里持有对 item 的引用
            System.out.println(item);
        });
}

12.5 线程和调度问题

Q10: 如何正确使用 Scheduler?

问题描述:不清楚何时使用哪个 Scheduler,导致性能问题或线程安全问题。

解决方案

// Schedulers.parallel(): CPU 密集型任务
public Flux<Integer> cpuIntensiveTask() {
    return Flux.range(1, 1000)
        .parallel()
        .runOn(Schedulers.parallel()) // 使用并行调度器
        .map(i -> heavyComputation(i))
        .sequential();
}

// Schedulers.boundedElastic(): I/O 密集型任务
public Flux<String> ioIntensiveTask() {
    return Flux.range(1, 100)
        .flatMap(i ->
            Mono.fromCallable(() -> ioOperation(i))
                .subscribeOn(Schedulers.boundedElastic()) // 使用弹性调度器
        );
}

// Schedulers.single(): 单线程顺序执行
public Flux<Integer> singleThreadTask() {
    return Flux.range(1, 10)
        .publishOn(Schedulers.single()) // 使用单线程调度器
        .map(i -> i * 2);
}

// Schedulers.immediate(): 当前线程执行
public Mono<String> immediateExecution() {
    return Mono.just("Hello")
        .publishOn(Schedulers.immediate()); // 在当前线程执行
}

// 自定义线程池
public Flux<String> customThreadPool() {
    ExecutorService executor = Executors.newFixedThreadPool(10);
    return Flux.range(1, 100)
        .flatMap(i ->
            Mono.fromCallable(() -> processItem(i))
                .subscribeOn(Schedulers.fromExecutor(executor))
        );
}

Q11: 如何避免线程安全问题?

问题描述:在响应式流中访问共享状态导致线程安全问题。

解决方案

// ❌ 错误:共享可变状态
public Flux<Integer> wrongSharedState() {
    AtomicInteger counter = new AtomicInteger(0);
    return Flux.range(1, 100)
        .parallel()
        .runOn(Schedulers.parallel())
        .map(i -> counter.incrementAndGet()); // 线程不安全
}

// ✅ 正确:使用线程安全的数据结构
public Flux<Integer> correctThreadSafeState() {
    AtomicLong counter = new AtomicLong(0);
    return Flux.range(1, 100)
        .parallel()
        .runOn(Schedulers.parallel())
        .map(i -> counter.incrementAndGet()); // AtomicLong 是线程安全的
}

// ✅ 正确:避免共享状态
public Flux<Integer> avoidSharedState() {
    return Flux.range(1, 100)
        .parallel()
        .runOn(Schedulers.parallel())
        .map(i -> i + 1); // 每个元素独立处理
}

// ✅ 正确:使用 context 传递上下文信息
public Mono<String> useContext() {
    return Mono.deferContextual(contextView -> {
        String requestId = contextView.get("requestId");
        return Mono.just("Processing request: " + requestId);
    }).contextWrite(context -> context.put("requestId", UUID.randomUUID().toString()));
}

12.6 测试问题

Q12: 如何测试响应式代码?

问题描述:不知道如何正确测试响应式代码。

解决方案

// 1. 使用 StepVerifier 测试 Mono
@Test
public void testMono() {
    Mono<String> mono = Mono.just("Hello");
    
    StepVerifier.create(mono)
        .expectNext("Hello")
        .expectComplete()
        .verify();
}

// 2. 使用 StepVerifier 测试 Flux
@Test
public void testFlux() {
    Flux<Integer> flux = Flux.range(1, 5);
    
    StepVerifier.create(flux)
        .expectNext(1, 2, 3, 4, 5)
        .expectComplete()
        .verify();
}

// 3. 测试错误场景
@Test
public void testError() {
    Mono<String> mono = Mono.error(new RuntimeException("Test error"));
    
    StepVerifier.create(mono)
        .expectError(RuntimeException.class)
        .verify();
}

// 4. 使用虚拟时间测试时间相关操作
@Test
public void testWithVirtualTime() {
    VirtualTimeScheduler.getOrSet();
    
    try {
        Flux<Long> flux = Flux.interval(Duration.ofSeconds(1))
            .take(3);
        
        StepVerifier.withVirtualTime(() -> flux)
            .expectSubscription()
            .expectNoEvent(Duration.ofSeconds(1))
            .expectNext(0L)
            .expectNoEvent(Duration.ofSeconds(1))
            .expectNext(1L)
            .expectNoEvent(Duration.ofSeconds(1))
            .expectNext(2L)
            .expectComplete()
            .verify();
    } finally {
        VirtualTimeScheduler.reset();
    }
}

// 5. 使用 WebTestClient 测试 WebFlux
@SpringBootTest
@AutoConfigureWebTestClient
public class WebFluxTest {
    
    @Autowired
    private WebTestClient webTestClient;
    
    @Test
    public void testGetUser() {
        webTestClient.get()
            .uri("/api/users/1")
            .exchange()
            .expectStatus().isOk()
            .expectBody(User.class);
    }
}

12.7 数据库问题

Q13: 如何在响应式应用中使用事务?

问题描述:需要在响应式应用中管理数据库事务。

解决方案

// 1. 声明式事务
@Service
public class UserService {
    
    @Transactional
    public Mono<User> createUserWithOrder(User user, Order order) {
        return userRepository.save(user)
            .flatMap(savedUser -> {
                order.setUserId(savedUser.getId());
                return orderRepository.save(order);
            })
            .thenReturn(savedUser);
    }
}

// 2. 编程式事务
@Service
public class TransactionService {
    
    private final TransactionalOperator rxtx;
    
    public Mono<Void> transferMoney(Long fromId, Long toId, Double amount) {
        return userRepository.findById(fromId)
            .flatMap(fromUser -> userRepository.findById(toId)
                .flatMap(toUser -> {
                    fromUser.setBalance(fromUser.getBalance() - amount);
                    toUser.setBalance(toUser.getBalance() + amount);
                    
                    return userRepository.save(fromUser)
                        .then(userRepository.save(toUser));
                })
            )
            .as(rxtx::transactional) // 应用事务
            .then();
    }
}

// 3. 嵌套事务
@Transactional
public Mono<Void> complexOperation(User user, Order order, Payment payment) {
    return userRepository.save(user)
        .flatMap(savedUser -> {
            order.setUserId(savedUser.getId());
            return orderRepository.save(order);
        })
        .flatMap(savedOrder -> {
            payment.setOrderId(savedOrder.getId());
            return processPayment(payment);
        })
        .then();
}

Q14: 如何处理数据库连接池耗尽?

问题描述:高并发情况下数据库连接池耗尽。

解决方案

// 1. 配置合适的连接池大小
@Configuration
public class R2dbcPoolConfig extends AbstractR2dbcConfiguration {
    
    @Override
    protected String connectionUrl() {
        return "r2dbc:postgresql://localhost:5432/reactor_db" +
            "?poolSize=50" + // 根据并发量调整
            "&maxIdleTime=PT30S" +
            "&maxLifeTime=PT10M";
    }
}

// 2. 使用背压控制数据库访问速率
public Flux<User> getUsersWithBackpressure() {
    return userRepository.findAll()
        .onBackpressureBuffer(100) // 缓冲请求
        .limitRate(50); // 限制处理速率
}

// 3. 使用超时避免长时间占用连接
public Mono<User> getUserWithTimeout(Long id) {
    return userRepository.findById(id)
        .timeout(Duration.ofSeconds(5), Mono.empty());
}

// 4. 监控连接池使用情况
@Component
public class ConnectionPoolMonitor {
    
    @Scheduled(fixedRate = 60000)
    public void monitorPool() {
        // 监控连接池指标
        logger.info("Active connections: {}", getActiveConnections());
        logger.info("Idle connections: {}", getIdleConnections());
    }
}

12.8 安全问题

Q15: 如何在响应式应用中实现安全控制?

问题描述:需要在响应式应用中实现认证和授权。

解决方案

// 1. 方法级安全
@Service
public class SecureUserService {
    
    @PreAuthorize("hasRole('ADMIN')")
    public Flux<User> getAllUsers() {
        return userRepository.findAll();
    }
    
    @PreAuthorize("#id == authentication.principal.id or hasRole('ADMIN')")
    public Mono<User> getOwnUser(Long id) {
        return userRepository.findById(id);
    }
}

// 2. JWT 认证
@Component
public class JwtAuthenticationFilter implements WebFilter {
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        String authHeader = exchange.getRequest().getHeaders().getFirst("Authorization");
        
        if (authHeader != null && authHeader.startsWith("Bearer ")) {
            String token = authHeader.substring(7);
            
            return authenticationManager.authenticate(
                new UsernamePasswordAuthenticationToken(null, token)
            ).flatMap(authentication -> {
                return chain.filter(exchange)
                    .contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
            }).switchIfEmpty(chain.filter(exchange));
        }
        
        return chain.filter(exchange);
    }
}

// 3. 输入验证
public Mono<User> createUser(User user) {
    if (user.getUsername() == null || user.getUsername().isEmpty()) {
        return Mono.error(new ValidationException("Username is required"));
    }
    
    if (user.getEmail() == null || !isValidEmail(user.getEmail())) {
        return Mono.error(new ValidationException("Invalid email format"));
    }
    
    return userRepository.save(user);
}

// 4. 参数化查询防止 SQL 注入
public interface UserRepository extends R2dbcRepository<User, Long> {
    // 使用参数化查询
    Mono<User> findByUsername(String username);
    
    // 不要使用字符串拼接
    // @Query("SELECT * FROM users WHERE username = '" + username + "'") // ❌ 错误
}

12.9 调试和监控

Q16: 如何调试响应式代码?

问题描述:响应式代码难以调试,需要找到问题所在。

解决方案

// 1. 使用 doOnNext/doOnError/doOnComplete 添加日志
public Flux<String> debugStream() {
    return Flux.range(1, 10)
        .doOnNext(i -> logger.info("Processing item: {}", i))
        .flatMap(i -> processItem(i)
            .doOnError(error ->
                logger.error("Error processing item {}: {}", i, error.getMessage())
            )
        )
        .doOnComplete(() -> logger.info("Stream completed"))
        .doOnError(error -> logger.error("Stream error: {}", error.getMessage()));
}

// 2. 使用 checkpoint 添加检查点
public Flux<String> withCheckpoint() {
    return Flux.range(1, 10)
        .flatMap(this::processItem)
        .checkpoint("After processing"); // 添加检查点
}

// 3. 使用 Reactor Debug 模式
// 启动时添加 JVM 参数: -Dreactor.traceback=true

// 4. 使用 log() 操作符
public Flux<String> withLog() {
    return Flux.range(1, 10)
        .log("debug-stream") // 记录流的所有事件
        .flatMap(this::processItem);
}

// 5. 使用 context 传递调试信息
public Mono<String> debugWithContext() {
    return Mono.deferContextual(contextView -> {
        String debugId = contextView.getOrDefault("debugId", "unknown");
        logger.info("Processing with debug ID: {}", debugId);
        return Mono.just("Result");
    }).contextWrite(context -> context.put("debugId", UUID.randomUUID().toString()));
}

Q17: 如何监控响应式应用?

问题描述:需要监控响应式应用的性能和健康状况。

解决方案

// 1. 使用 Micrometer 收集指标
@Component
public class MetricsCollector {
    
    private final MeterRegistry meterRegistry;
    private final Counter requestCounter;
    private final Timer responseTimer;
    
    public MetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.requestCounter = Counter.builder("http.requests")
            .description("Number of HTTP requests")
            .register(meterRegistry);
        this.responseTimer = Timer.builder("http.response.time")
            .description("HTTP response time")
            .register(meterRegistry);
    }
    
    public Mono<ServerResponse> withMetrics(ServerRequest request, Mono<ServerResponse> response) {
        requestCounter.increment();
        
        long startTime = System.currentTimeMillis();
        
        return response.doOnSuccess(r -> {
            long duration = System.currentTimeMillis() - startTime;
            responseTimer.record(duration, TimeUnit.MILLISECONDS);
        });
    }
}

// 2. 使用 Reactor 的 hooks 监控
@Component
public class ReactorHooks implements ApplicationListener<ContextRefreshedEvent> {
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 监控所有错误
        Hooks.onOperatorDebug();
        
        // 监控所有操作符
        Hooks.onEachOperator(Operators.lift((sc, sub) -> {
            // 自定义监控逻辑
            return new CoreSubscriber<Object>() {
                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }
                
                @Override
                public void onNext(Object o) {
                    // 记录数据
                    sub.onNext(o);
                }
                
                @Override
                public void onError(Throwable t) {
                    // 记录错误
                    sub.onError(t);
                }
                
                @Override
                public void onComplete() {
                    // 记录完成
                    sub.onComplete();
                }
            };
        }));
    }
}

// 3. 使用 Actuator 暴露健康检查端点
@RestController
public class HealthController {
    
    @GetMapping("/health")
    public Mono<Health> health() {
        return Mono.fromCallable(() -> {
            // 检查数据库连接
            boolean dbHealthy = checkDatabase();
            
            // 检查外部服务
            boolean externalHealthy = checkExternalServices();
            
            if (dbHealthy && externalHealthy) {
                return Health.up()
                    .withDetail("database", "OK")
                    .withDetail("external", "OK")
                    .build();
            } else {
                return Health.down()
                    .withDetail("database", dbHealthy ? "OK" : "DOWN")
                    .withDetail("external", externalHealthy ? "OK" : "DOWN")
                    .build();
            }
        }).subscribeOn(Schedulers.boundedElastic());
    }
}

12.10 迁移问题

Q18: 如何从传统 Spring MVC 迁移到 WebFlux?

问题描述:需要将现有的 Spring MVC 应用迁移到 WebFlux。

解决方案

// 1. Controller 返回类型变更
// 传统 Spring MVC
@GetMapping("/users/{id}")
public User getUser(@PathVariable Long id) {
    return userService.findById(id);
}

// WebFlux
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
    return userService.findById(id);
}

// 2. Service 层变更
// 传统 Spring MVC
@Service
public class UserService {
    public User findById(Long id) {
        return userRepository.findById(id);
    }
}

// WebFlux
@Service
public class UserService {
    public Mono<User> findById(Long id) {
        return userRepository.findById(id);
    }
}

// 3. Repository 层变更
// 传统 Spring Data JPA
public interface UserRepository extends JpaRepository<User, Long> {
    User findById(Long id);
}

// WebFlux Spring Data R2DBC
public interface UserRepository extends R2dbcRepository<User, Long> {
    Mono<User> findById(Long id);
}

// 4. 配置变更
// 传统 Spring MVC
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

// WebFlux
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

// 5. 依赖变更
// 传统 Spring MVC
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
}

// WebFlux
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
}

Q19: 如何逐步迁移到响应式编程?

问题描述:需要逐步迁移,而不是一次性重写整个应用。

解决方案

// 1. 使用 Spring MVC 和 WebFlux 共存
@SpringBootApplication
public class HybridApplication {
    public static void main(String[] args) {
        SpringApplication.run(HybridApplication.class, args);
    }
}

// 2. 在 Spring MVC 中使用响应式组件
@RestController
@RequestMapping("/api/v1")
public class LegacyController {
    
    private final ReactiveUserService reactiveUserService;
    
    @GetMapping("/users/{id}")
    public User getUser(@PathVariable Long id) {
        // 阻塞式调用响应式服务
        return reactiveUserService.findById(id).block();
    }
}

@RestController
@RequestMapping("/api/v2")
public class ReactiveController {
    
    private final ReactiveUserService reactiveUserService;
    
    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable Long id) {
        // 完全响应式
        return reactiveUserService.findById(id);
    }
}

// 3. 使用适配器模式
public class BlockingUserServiceAdapter {
    
    private final ReactiveUserService reactiveUserService;
    
    public User findById(Long id) {
        return reactiveUserService.findById(id)
            .block(Duration.ofSeconds(5));
    }
    
    public List<User> findAll() {
        return reactiveUserService.findAll()
            .collectList()
            .block(Duration.ofSeconds(5));
    }
}

通过了解这些常见问题和解决方案,开发者可以更好地应对响应式编程中的各种挑战,构建更加稳定、高效的响应式应用程序。