目录

响应式编程框架Reactor8

响应式编程框架Reactor【8】

十三、性能优化与最佳实践

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

public class PerformanceBestPractices {
    
    // 1. 合理使用调度器
    public Flux<String> ioIntensiveOperation(Flux<String> inputs) {
        return inputs
            .flatMap(input -> 
                Mono.fromCallable(() -> blockingIoOperation(input))
                    .subscribeOn(Schedulers.boundedElastic()) // IO操作使用弹性线程池
            );
    }
    
    private String blockingIoOperation(String input) {
        // 模拟阻塞IO操作
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return input.toUpperCase();
    }
    
    // 2. 避免在响应式链中进行阻塞操作
    public Flux<String> nonBlockingAlternative(Flux<String> inputs) {
        return inputs
            .delayElements(Duration.ofMillis(100)) // 使用延迟而不是睡眠
            .map(String::toUpperCase);
    }
    
    // 3. 合理使用缓存
    public Mono<String> getWithCache(String key) {
        return Mono.fromCallable(() -> expensiveOperation(key))
            .cache(Duration.ofMinutes(5)) // 缓存5分钟
            .subscribeOn(Schedulers.parallel());
    }
    
    private String expensiveOperation(String key) {
        // 模拟昂贵操作
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "Processed: " + key;
    }
    
    // 4. 背压感知处理
    public Flux<Data> processWithBackpressureAwareness(Flux<Data> dataStream) {
        AtomicInteger counter = new AtomicInteger();
        
        return dataStream
            .onBackpressureBuffer(1000, 
                data -> System.out.println("Dropping: " + data)
            )
            .doOnNext(data -> {
                if (counter.incrementAndGet() % 100 == 0) {
                    System.out.println("Processed " + counter.get() + " items");
                }
            })
            .subscribeOn(Schedulers.parallel());
    }
    
    // 5. 合理使用flatMap与concatMap
    public Flux<Result> processWithConcurrencyControl(Flux<Request> requests) {
        return requests
            .flatMap(request -> 
                processRequest(request)
                    .subscribeOn(Schedulers.parallel()),
                10 // 最大并发数
            );
    }
    
    private Mono<Result> processRequest(Request request) {
        return Mono.fromCallable(() -> {
            // 处理请求
            return new Result("Processed: " + request.getId());
        });
    }
    
    // 6. 监控与指标收集
    public Flux<Data> processWithMetrics(Flux<Data> dataStream) {
        return dataStream
            .name("dataProcessing") // 为操作命名以便监控
            .metrics() // 收集指标
            .doOnNext(data -> {
                // 业务逻辑
            })
            .doOnError(error -> {
                // 错误处理与记录
            });
    }
}

// 简单的数据模型
class Data {
    private String content;
    // getters and setters
}

class Request {
    private String id;
    // getters and setters
}

class Result {
    private String message;
    // constructor, getters
}

https://i-blog.csdnimg.cn/direct/9f8e18e1a4934e489476657757d081a9.png

十四、Reactor工作原理与流程图

14.1 Reactor执行流程

Subscriber

Publisher (Flux/Mono)

Operators

Scheduler

subscribe()

创建操作链

安排执行(如果需要)

在指定线程执行

onSubscribe(Subscription)

request(n)

请求数据

onNext(data)

应用转换/过滤

onNext(processedData)

request(m) (更多数据)

onComplete() (数据完成)

onComplete()

错误处理路径

onError(throwable)

onError(throwable)

Subscriber

Publisher (Flux/Mono)

Operators

Scheduler

14.2 Reactor背压机制

不能

BUFFER

DROP

LATEST

ERROR

Publisher 产生数据

Subscriber 能否处理?

发送数据 onNext

更新请求计数

背压策略

缓冲数据

丢弃数据

丢弃最旧, 保持最新

发出错误信号

缓冲区满?

应用溢出策略

还有请求额度?

等待新请求

十五、Reactor Context API详解

Reactor 的 Context API 提供了一种在反应式流中传递上下文信息的机制,类似于传统编程中的 ThreadLocal,但专门为反应式编程设计。它允许在流的处理过程中传递和访问上下文数据,而不会破坏反应式流的不可变性和链式特性。

15.1 Context的设计目的

  1. 跨操作符传递数据:在反应式链的不同操作符之间传递上下文信息
  2. 避免方法参数污染:不需要将上下文数据作为参数在每个方法间传递
  3. 线程安全:适用于反应式编程中线程切换的场景
  4. 与订阅相关:每个订阅都有自己独立的 Context

15.2 Context 与 ThreadLocal 的区别

上下文传递机制

ThreadLocal

Reactor Context

基于线程存储

同步编程适用

线程切换时问题

基于订阅存储

反应式编程适用

线程安全

15.3 Context基本用法

15.3.1 创建和访问 Context

package cn.tcmeta.context;

import reactor.util.context.Context;

/**
 * @author: laoren
 * @description: Context基本示例
 * @version: 1.0.0
 */
public class ContextBasicExample {
    public static void main(String[] args) {
        // 1. 创建Context对象
        Context context = Context.of("user", "jack", "requestID", "1234");

        // 2. 读取Context对象当中的值
        String username = context.get("user");
        var requestID = context.get("requestID");
        System.out.println("username: " + username);
        System.out.println("requestID: " + requestID);

        // 3. 使用【getOrDefault】安全获取值
        String hasUser = context.getOrDefault("userRole", "defaultRole");
        System.out.println("hasUser: " + hasUser);

        // 4. 检查是否存在
        boolean hasUserRole = context.hasKey("userRole");
        System.out.println("hasUserRole: " + hasUserRole);

        // 5. 创建新的Context对象【Context是不可变的】
        Context newContext = context.put("department", "工程师");
        System.out.println("department: " + newContext.get("department"));

        // 6. 删除键
        Context withoutRequestId = newContext.delete("requestId");
        System.out.println("withoutRequestId: " + withoutRequestId.get("requestId"));
    }
}

https://i-blog.csdnimg.cn/direct/3495b74ae18d4e4b801cfc9ede0b17c6.png

15.3.2 在反应式流当中使用Context

1. 使用 contextWrite 将 Context 注入流中

// 1. 使用 contextWrite 将 Context 注入流中
Mono<String> result = Mono.just("Hello")
    .flatMap(value ->
             Mono.deferContextual(ctx -> {
                 String user = ctx.get("user");
                 return Mono.just(value + " " + user);
             })
            )
    .contextWrite(Context.of("user", "Alice"));

result.subscribe(System.out::println); // 输出: Hello Alice

2. 多层Context写入

// 多层 Context 写入
Mono<String> multiLayer = Mono.just("Message")
    .flatMap(value ->
             Mono.deferContextual(ctx -> {
                 String user = ctx.get("user");
                 String role = ctx.get("role");
                 return Mono.just(value + " for " + user + " (" + role + ")");
             })
            )
    .contextWrite(Context.of("role", "admin")) // 第二层
    .contextWrite(Context.of("user", "Bob"));   // 第一层(最外层)

multiLayer.subscribe(System.out::println); // 输出: Message for Bob (admin)

3. Context 的读取顺序是从内到外

Mono<String> orderExample = Mono.deferContextual(ctx -> {
    // 这里会读取到最内层的 user
    String user = ctx.get("user");
    return Mono.just("User: " + user);
})
    .contextWrite(Context.of("user", "inner"))  // 内层
    .contextWrite(Context.of("user", "outer")); // 外层(会被内层覆盖)

orderExample.subscribe(System.out::println); // 输出: User: inner

15.4 Context高级用法

15.4.1 在操作符之间传递Context

package cn.tcmeta.context;

import reactor.core.publisher.Mono;
import reactor.util.context.Context;

/**
 * @author: laoren
 * @date: 2025/8/27 19:46
 * @description: 在操作符之间传递 Context
 * @version: 1.0.0
 */
public class ContextAcrossOperators {
    static class UserService {
        Mono<String> getUserName(String userId) {
            return Mono.deferContextual(ctx -> {
                String requestId = ctx.getOrDefault("requestId", "unknow");
                System.out.println("requestId: " + requestId + " - userId: " + userId);
                return Mono.just("User_" + userId);
            });
        }
    }

    static class OrderService {
        Mono<String> getOrderInfo(String orderId) {
            return Mono.deferContextual(ctx -> {
                String requestId = ctx.getOrDefault("requestId", "unknow");
                String user = ctx.getOrDefault("user", "unknow");
                System.out.println("requestId: " + requestId + " - user: " + user + " - orderId: " + orderId);
                return Mono.just("Order_" + orderId + " _for_ " + user);
            });
        }
    }

    public static void main(String[] args) {
        UserService userService = new UserService();
        OrderService orderService = new OrderService();

        // 在流处理过程中传递 Context
        Mono<String> result = Mono.just("123")
                .flatMap(userService::getUserName)
                .flatMap(userName ->
                        Mono.deferContextual(ctx -> {
                            // 将用户信息存入 Context
                            return Mono.just("order456")
                                    .flatMap(orderService::getOrderInfo)
                                    .contextWrite(Context.of("user", userName));
                        })
                )
                .contextWrite(Context.of("requestId", "req-789"));

        result.subscribe(System.out::println);
        // 输出:
        // Getting user 123 with requestId: req-789
        // Getting order order456 for User_123 with requestId: req-789
        // Order_order456_for_User_123
    }
}

15.4.2 Context 与错误处理

import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public class ContextErrorHandling {
    
    public static void main(String[] args) {
        // 在错误处理中访问 Context
        Mono<String> result = Mono.error(new RuntimeException("Something went wrong"))
            .onErrorResume(e -> 
                Mono.deferContextual(ctx -> {
                    String requestId = ctx.getOrDefault("requestId", "unknown");
                    String errorMessage = "Error in request " + requestId + ": " + e.getMessage();
                    return Mono.just(errorMessage);
                })
            )
            .contextWrite(Context.of("requestId", "req-123"));
        
        result.subscribe(System.out::println); // 输出: Error in request req-123: Something went wrong
        
        // 在 doOnError 中访问 Context
        Mono<String> withDoOnError = Mono.just("data")
            .flatMap(d -> Mono.error(new RuntimeException("Processing error")))
            .doOnError(e -> 
                Mono.deferContextual(ctx -> {
                    String requestId = ctx.get("requestId");
                    System.err.println("Error occurred for request: " + requestId);
                    return Mono.empty();
                }).subscribe() // 注意:需要订阅这个内部流
            )
            .onErrorResume(e -> Mono.just("fallback"))
            .contextWrite(Context.of("requestId", "req-456"));
        
        withDoOnError.subscribe(System.out::println);
        // 输出: 
        // Error occurred for request: req-456
        // fallback
    }
}

15.4.3 嵌套 Context 处理

import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public class NestedContextExample {
    
    public static void main(String[] args) {
        // 模拟外部服务调用
        Mono<String> externalServiceCall = Mono.deferContextual(ctx -> {
            String authToken = ctx.get("authToken");
            String requestId = ctx.get("requestId");
            System.out.println("Calling service with token: " + authToken + ", requestId: " + requestId);
            return Mono.just("Service response");
        });
        
        // 嵌套 Context 使用
        Mono<String> result = Mono.just("process")
            .flatMap(operation -> 
                Mono.deferContextual(outerCtx -> {
                    String userId = outerCtx.get("userId");
                    
                    // 创建内部 Context(添加认证令牌)
                    Context innerContext = outerCtx.put("authToken", "token-for-" + userId);
                    
                    return externalServiceCall
                        .contextWrite(innerContext)
                        .flatMap(serviceResponse -> 
                            Mono.deferContextual(innerCtx -> {
                                String requestId = innerCtx.get("requestId");
                                return Mono.just("Processed: " + serviceResponse + " for user " + userId + " (request: " + requestId + ")");
                            })
                        );
                })
            )
            .contextWrite(Context.of("userId", "alice", "requestId", "req-123"));
        
        result.subscribe(System.out::println);
        // 输出:
        // Calling service with token: token-for-alice, requestId: req-123
        // Processed: Service response for user alice (request: req-123)
    }
}

15.5 Context在实际应用中的作用

15.5.1 Web应用中的请求上下文

import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import org.springframework.web.server.ServerWebExchange;

public class WebContextExample {
    
    // 模拟的 Web 处理器
    static class WebHandler {
        Mono<String> handleRequest(ServerWebExchange exchange) {
            return Mono.just("request-data")
                .flatMap(data -> processData(data))
                .contextWrite(Context.of(
                    "exchange", exchange,
                    "requestId", exchange.getRequest().getId(),
                    "user", getCurrentUser(exchange)
                ));
        }
        
        Mono<String> processData(String data) {
            return Mono.deferContextual(ctx -> {
                ServerWebExchange exchange = ctx.get("exchange");
                String requestId = ctx.get("requestId");
                String user = ctx.get("user");
                
                System.out.println("Processing data for user: " + user + ", requestId: " + requestId);
                return Mono.just("Processed: " + data);
            });
        }
        
        String getCurrentUser(ServerWebExchange exchange) {
            // 模拟从 exchange 中获取用户信息
            return "user-alice";
        }
    }
    
    // 模拟的 ServerWebExchange
    static class MockServerWebExchange {
        private MockServerRequest request = new MockServerRequest();
        
        public MockServerRequest getRequest() {
            return request;
        }
    }
    
    static class MockServerRequest {
        public String getId() {
            return "req-12345";
        }
    }
    
    public static void main(String[] args) {
        WebHandler handler = new WebHandler();
        MockServerWebExchange exchange = new MockServerWebExchange();
        
        handler.handleRequest(exchange)
            .subscribe(System.out::println);
        // 输出: 
        // Processing data for user: user-alice, requestId: req-12345
        // Processed: request-data
    }
}

15.5.2 分布式追踪与日志

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import java.util.UUID;

public class TracingContextExample {
    
    // 模拟的分布式追踪器
    static class Tracer {
        void logEvent(String event, String requestId) {
            System.out.println("[" + requestId + "] " + event);
        }
        
        Mono<String> startSpan(String name, String requestId) {
            return Mono.fromCallable(() -> {
                String spanId = UUID.randomUUID().toString();
                logEvent("Started span: " + name + " (" + spanId + ")", requestId);
                return spanId;
            });
        }
        
        Mono<Void> endSpan(String spanId, String requestId) {
            return Mono.fromRunnable(() -> 
                logEvent("Ended span: " + spanId, requestId)
            );
        }
    }
    
    public static void main(String[] args) {
        Tracer tracer = new Tracer();
        String requestId = UUID.randomUUID().toString();
        
        Flux.range(1, 3)
            .flatMap(i -> 
                Mono.deferContextual(ctx -> {
                    String currentRequestId = ctx.get("requestId");
                    
                    return tracer.startSpan("process-item-" + i, currentRequestId)
                        .flatMap(spanId -> 
                            processItem(i)
                                .doOnSuccess(result -> 
                                    tracer.endSpan(spanId, currentRequestId).subscribe()
                                )
                                .doOnError(error -> 
                                    tracer.endSpan(spanId, currentRequestId).subscribe()
                                )
                        );
                })
            )
            .contextWrite(Context.of("requestId", requestId))
            .subscribe();
        
        // 输出示例:
        // [a1b2c3d4...] Started span: process-item-1 (e5f6g7h8...)
        // [a1b2c3d4...] Ended span: e5f6g7h8...
        // [a1b2c3d4...] Started span: process-item-2 (i9j0k1l2...)
        // [a1b2c3d4...] Ended span: i9j0k1l2...
        // [a1b2c3d4...] Started span: process-item-3 (m3n4o5p6...)
        // [a1b2c3d4...] Ended span: m3n4o5p6...
    }
    
    static Mono<String> processItem(int item) {
        return Mono.just("Processed item " + item)
            .delayElement(java.time.Duration.ofMillis(100));
    }
}

15.5.3 权限验证与安全上下文

import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public class SecurityContextExample {
    
    // 模拟的用户认证信息
    static class Authentication {
        private final String username;
        private final String[] roles;
        
        public Authentication(String username, String[] roles) {
            this.username = username;
            this.roles = roles;
        }
        
        public String getUsername() {
            return username;
        }
        
        public String[] getRoles() {
            return roles;
        }
        
        public boolean hasRole(String role) {
            for (String r : roles) {
                if (r.equals(role)) {
                    return true;
                }
            }
            return false;
        }
    }
    
    // 模拟的资源服务
    static class ResourceService {
        Mono<String> getProtectedResource(String resourceId) {
            return Mono.deferContextual(ctx -> {
                Authentication auth = ctx.get("auth");
                
                if (!auth.hasRole("ADMIN")) {
                    return Mono.error(new SecurityException("Access denied"));
                }
                
                return Mono.just("Protected resource: " + resourceId);
            });
        }
    }
    
    public static void main(String[] args) {
        ResourceService service = new ResourceService();
        
        // 有权限的用户
        Authentication adminAuth = new Authentication("alice", new String[]{"USER", "ADMIN"});
        
        service.getProtectedResource("secret-data")
            .contextWrite(Context.of("auth", adminAuth))
            .subscribe(
                System.out::println,
                error -> System.err.println("Error: " + error.getMessage())
            );
        // 输出: Protected resource: secret-data
        
        // 无权限的用户
        Authentication userAuth = new Authentication("bob", new String[]{"USER"});
        
        service.getProtectedResource("secret-data")
            .contextWrite(Context.of("auth", userAuth))
            .subscribe(
                System.out::println,
                error -> System.err.println("Error: " + error.getMessage())
            );
        // 输出: Error: Access denied
    }
}

15.6 Context 工作原理与流程图

15.6.1 Context 在反应式流中的传递

Subscriber

Operator 1

Operator 2

Operator 3

subscribe()

向下游传递

向下游传递

contextWrite(ContextA)

处理数据 + ContextA

处理数据 + ContextA

最终结果

contextWrite(ContextB)

处理数据 + ContextB

最终结果

Subscriber

Operator 1

Operator 2

Operator 3

15.6.2 Context 的读取顺序

读取 Context 值

当前操作符有 Context?

读取当前 Context

向上游查找 Context

找到 Context?

返回找到的值

使用默认值或抛出异常

返回找到的值

15.7 最佳实践与注意事项

15.7.1 合理使用 Context

// 好的做法:将相关的上下文信息放在 Context 中
Context context = Context.of(
    "requestId", "req-123",
    "userId", "user-alice",
    "traceId", "trace-456"
);

// 不好的做法:将不相关或过多的数据放在 Context 中
Context badContext = Context.of(
    "requestId", "req-123",
    "unrelatedData", largeObject, // 大对象不适合放在 Context 中
    "config", configMap // 配置信息应该通过其他方式传递
);

15.7.2 Context 键命名规范

// 使用有意义的键名,避免冲突
public static final String KEY_REQUEST_ID = "com.example.requestId";
public static final String KEY_USER = "com.example.user";

Context context = Context.of(KEY_REQUEST_ID, "req-123", KEY_USER, "alice");

16.7.3 错误处理

Mono.deferContextual(ctx -> {
    // 使用 getOrDefault 避免 NoSuchElementException
    String value = ctx.getOrDefault("key", "default");
    
    // 或者使用 hasKey 检查
    if (ctx.hasKey("key")) {
        return Mono.just(ctx.get("key"));
    } else {
        return Mono.error(new IllegalArgumentException("Missing key"));
    }
});

15.8 注意事项

15.8.1 Context是不可变的

Context original = Context.of("key", "value");
Context updated = original.put("newKey", "newValue"); // 返回新对象

System.out.println(original.hasKey("newKey")); // false
System.out.println(updated.hasKey("newKey"));   // true

15.8.2 Context 与线程安全

// Context 是线程安全的,可以在多线程环境中使用
Mono.fromCallable(() -> {
        // 在另一个线程中访问 Context
        return "result";
    })
    .flatMap(result -> 
        Mono.deferContextual(ctx -> {
            String requestId = ctx.get("requestId"); // 安全访问
            return Mono.just(result + " for " + requestId);
        })
    )
    .contextWrite(Context.of("requestId", "req-123"))
    .subscribeOn(Schedulers.parallel()) // 切换到并行调度器
    .subscribe();

15.8.3 性能考虑

// 避免在热点路径中频繁创建大的 Context
Flux.range(1, 1000)
    .contextWrite(Context.of("data", largeData)) // 大的数据对象
    .subscribe(); // 这会在每个元素上创建 Context 副本

// 更好的做法:将大数据存储在外部,Context 中只存储引用
Flux.range(1, 1000)
    .contextWrite(Context.of("dataRef", dataReference))
    .subscribe();

15.8 总结

Reactor 的 Context API 提供了在反应式流中传递上下文信息的强大机制。通过本文的详细讲解,我们可以总结出以下几点:

  1. 核心特性
    • 基于订阅的上下文存储
    • 不可变的数据结构
    • 线程安全的访问机制
    • 支持多层上下文嵌套
  2. 适用场景
    • 请求范围的数据传递(如请求ID、用户信息)
    • 分布式追踪和日志记录
    • 权限验证和安全上下文
    • 跨操作符的配置传递
  3. 最佳实践
    • 合理选择存储在 Context 中的数据
    • 使用有意义的键名避免冲突
    • 使用 getOrDefault 或 hasKey 安全访问数据
    • 避免在 Context 中存储大对象
  4. 注意事项
    • Context 是不可变的,每次修改都会创建新对象
    • Context 的读取顺序是从内到外
    • 需要考虑性能影响,避免过度使用

Context API 是 Reactor 框架中处理上下文传递的标准方式,掌握了它的使用能够更好地构建可维护、可追踪的反应式应用程序。

十六、总结与实践经验

16.1 Reactor核心优势

  1. 高效的资源利用:通过非阻塞IO和合理的线程模型,实现高并发处理
  2. 强大的背压处理:内置多种背压策略,防止系统过载
  3. 丰富的操作符:提供函数式、声明式的数据处理方式
  4. 与Spring生态完美集成:Spring WebFlux、Spring Data R2DBC等
  5. 良好的测试支持:StepVerifier等工具简化测试编写

16.2 实践经验总结

  1. 线程模型选择
    • CPU密集型任务使用 Schedulers.parallel()
    • IO密集型任务使用 Schedulers.boundedElastic()
    • 避免在响应式链中阻塞线程
  2. 背压策略选择
    • 实时性要求高:onBackpressureDrop
    • 数据完整性重要:onBackpressureBuffer
    • 平衡实时与完整:onBackpressureLatest
  3. 错误处理原则
    • 使用 onErrorResume 提供降级方案
    • 使用 retryWhen 实现智能重试
    • 记录错误但保持流继续运行
  4. 性能优化技巧
    • 合理使用 cache 避免重复计算
    • 使用 windowbuffer 进行批量处理
    • 控制 flatMap 的并发度
  5. 调试与监控
    • 使用 .name().metrics() 进行监控
    • 使用 Hooks.onOperatorDebug() 调试复杂流
    • 利用 StepVerifier 进行全面测试

16.3 适用场景

  1. 高并发Web应用:特别是微服务架构中的API网关、聚合服务
  2. 实时数据处理:聊天应用、实时通知、实时分析
  3. 流式ETL管道:数据转换、 enrichment、聚合
  4. 响应式数据库访问:使用R2DBC进行非阻塞数据库操作
  5. 消息驱动架构:与Kafka、RabbitMQ等消息中间件集成

Reactor 框架为Java开发者提供了强大的响应式编程能力,通过合理运用其丰富的操作符和灵活的线程模型,可以构建出高性能、高弹性的现代应用程序。