响应式编程框架Reactor8
响应式编程框架Reactor【8】
十三、性能优化与最佳实践
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class PerformanceBestPractices {
// 1. 合理使用调度器
public Flux<String> ioIntensiveOperation(Flux<String> inputs) {
return inputs
.flatMap(input ->
Mono.fromCallable(() -> blockingIoOperation(input))
.subscribeOn(Schedulers.boundedElastic()) // IO操作使用弹性线程池
);
}
private String blockingIoOperation(String input) {
// 模拟阻塞IO操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return input.toUpperCase();
}
// 2. 避免在响应式链中进行阻塞操作
public Flux<String> nonBlockingAlternative(Flux<String> inputs) {
return inputs
.delayElements(Duration.ofMillis(100)) // 使用延迟而不是睡眠
.map(String::toUpperCase);
}
// 3. 合理使用缓存
public Mono<String> getWithCache(String key) {
return Mono.fromCallable(() -> expensiveOperation(key))
.cache(Duration.ofMinutes(5)) // 缓存5分钟
.subscribeOn(Schedulers.parallel());
}
private String expensiveOperation(String key) {
// 模拟昂贵操作
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Processed: " + key;
}
// 4. 背压感知处理
public Flux<Data> processWithBackpressureAwareness(Flux<Data> dataStream) {
AtomicInteger counter = new AtomicInteger();
return dataStream
.onBackpressureBuffer(1000,
data -> System.out.println("Dropping: " + data)
)
.doOnNext(data -> {
if (counter.incrementAndGet() % 100 == 0) {
System.out.println("Processed " + counter.get() + " items");
}
})
.subscribeOn(Schedulers.parallel());
}
// 5. 合理使用flatMap与concatMap
public Flux<Result> processWithConcurrencyControl(Flux<Request> requests) {
return requests
.flatMap(request ->
processRequest(request)
.subscribeOn(Schedulers.parallel()),
10 // 最大并发数
);
}
private Mono<Result> processRequest(Request request) {
return Mono.fromCallable(() -> {
// 处理请求
return new Result("Processed: " + request.getId());
});
}
// 6. 监控与指标收集
public Flux<Data> processWithMetrics(Flux<Data> dataStream) {
return dataStream
.name("dataProcessing") // 为操作命名以便监控
.metrics() // 收集指标
.doOnNext(data -> {
// 业务逻辑
})
.doOnError(error -> {
// 错误处理与记录
});
}
}
// 简单的数据模型
class Data {
private String content;
// getters and setters
}
class Request {
private String id;
// getters and setters
}
class Result {
private String message;
// constructor, getters
}
十四、Reactor工作原理与流程图
14.1 Reactor执行流程
Subscriber
Publisher (Flux/Mono)
Operators
Scheduler
subscribe()
创建操作链
安排执行(如果需要)
在指定线程执行
onSubscribe(Subscription)
request(n)
请求数据
onNext(data)
应用转换/过滤
onNext(processedData)
request(m) (更多数据)
onComplete() (数据完成)
onComplete()
错误处理路径
onError(throwable)
onError(throwable)
Subscriber
Publisher (Flux/Mono)
Operators
Scheduler
14.2 Reactor背压机制
能
不能
BUFFER
DROP
LATEST
ERROR
是
否
是
否
Publisher 产生数据
Subscriber 能否处理?
发送数据 onNext
更新请求计数
背压策略
缓冲数据
丢弃数据
丢弃最旧, 保持最新
发出错误信号
缓冲区满?
应用溢出策略
还有请求额度?
等待新请求
十五、Reactor Context API详解
Reactor 的 Context API 提供了一种在反应式流中传递上下文信息的机制,类似于传统编程中的 ThreadLocal
,但专门为反应式编程设计。它允许在流的处理过程中传递和访问上下文数据,而不会破坏反应式流的不可变性和链式特性。
15.1 Context的设计目的
- 跨操作符传递数据:在反应式链的不同操作符之间传递上下文信息
- 避免方法参数污染:不需要将上下文数据作为参数在每个方法间传递
- 线程安全:适用于反应式编程中线程切换的场景
- 与订阅相关:每个订阅都有自己独立的 Context
15.2 Context 与 ThreadLocal 的区别
上下文传递机制
ThreadLocal
Reactor Context
基于线程存储
同步编程适用
线程切换时问题
基于订阅存储
反应式编程适用
线程安全
15.3 Context基本用法
15.3.1 创建和访问 Context
package cn.tcmeta.context;
import reactor.util.context.Context;
/**
* @author: laoren
* @description: Context基本示例
* @version: 1.0.0
*/
public class ContextBasicExample {
public static void main(String[] args) {
// 1. 创建Context对象
Context context = Context.of("user", "jack", "requestID", "1234");
// 2. 读取Context对象当中的值
String username = context.get("user");
var requestID = context.get("requestID");
System.out.println("username: " + username);
System.out.println("requestID: " + requestID);
// 3. 使用【getOrDefault】安全获取值
String hasUser = context.getOrDefault("userRole", "defaultRole");
System.out.println("hasUser: " + hasUser);
// 4. 检查是否存在
boolean hasUserRole = context.hasKey("userRole");
System.out.println("hasUserRole: " + hasUserRole);
// 5. 创建新的Context对象【Context是不可变的】
Context newContext = context.put("department", "工程师");
System.out.println("department: " + newContext.get("department"));
// 6. 删除键
Context withoutRequestId = newContext.delete("requestId");
System.out.println("withoutRequestId: " + withoutRequestId.get("requestId"));
}
}
15.3.2 在反应式流当中使用Context
1. 使用 contextWrite 将 Context 注入流中
// 1. 使用 contextWrite 将 Context 注入流中
Mono<String> result = Mono.just("Hello")
.flatMap(value ->
Mono.deferContextual(ctx -> {
String user = ctx.get("user");
return Mono.just(value + " " + user);
})
)
.contextWrite(Context.of("user", "Alice"));
result.subscribe(System.out::println); // 输出: Hello Alice
2. 多层Context写入
// 多层 Context 写入
Mono<String> multiLayer = Mono.just("Message")
.flatMap(value ->
Mono.deferContextual(ctx -> {
String user = ctx.get("user");
String role = ctx.get("role");
return Mono.just(value + " for " + user + " (" + role + ")");
})
)
.contextWrite(Context.of("role", "admin")) // 第二层
.contextWrite(Context.of("user", "Bob")); // 第一层(最外层)
multiLayer.subscribe(System.out::println); // 输出: Message for Bob (admin)
3. Context 的读取顺序是从内到外
Mono<String> orderExample = Mono.deferContextual(ctx -> {
// 这里会读取到最内层的 user
String user = ctx.get("user");
return Mono.just("User: " + user);
})
.contextWrite(Context.of("user", "inner")) // 内层
.contextWrite(Context.of("user", "outer")); // 外层(会被内层覆盖)
orderExample.subscribe(System.out::println); // 输出: User: inner
15.4 Context高级用法
15.4.1 在操作符之间传递Context
package cn.tcmeta.context;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
/**
* @author: laoren
* @date: 2025/8/27 19:46
* @description: 在操作符之间传递 Context
* @version: 1.0.0
*/
public class ContextAcrossOperators {
static class UserService {
Mono<String> getUserName(String userId) {
return Mono.deferContextual(ctx -> {
String requestId = ctx.getOrDefault("requestId", "unknow");
System.out.println("requestId: " + requestId + " - userId: " + userId);
return Mono.just("User_" + userId);
});
}
}
static class OrderService {
Mono<String> getOrderInfo(String orderId) {
return Mono.deferContextual(ctx -> {
String requestId = ctx.getOrDefault("requestId", "unknow");
String user = ctx.getOrDefault("user", "unknow");
System.out.println("requestId: " + requestId + " - user: " + user + " - orderId: " + orderId);
return Mono.just("Order_" + orderId + " _for_ " + user);
});
}
}
public static void main(String[] args) {
UserService userService = new UserService();
OrderService orderService = new OrderService();
// 在流处理过程中传递 Context
Mono<String> result = Mono.just("123")
.flatMap(userService::getUserName)
.flatMap(userName ->
Mono.deferContextual(ctx -> {
// 将用户信息存入 Context
return Mono.just("order456")
.flatMap(orderService::getOrderInfo)
.contextWrite(Context.of("user", userName));
})
)
.contextWrite(Context.of("requestId", "req-789"));
result.subscribe(System.out::println);
// 输出:
// Getting user 123 with requestId: req-789
// Getting order order456 for User_123 with requestId: req-789
// Order_order456_for_User_123
}
}
15.4.2 Context 与错误处理
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
public class ContextErrorHandling {
public static void main(String[] args) {
// 在错误处理中访问 Context
Mono<String> result = Mono.error(new RuntimeException("Something went wrong"))
.onErrorResume(e ->
Mono.deferContextual(ctx -> {
String requestId = ctx.getOrDefault("requestId", "unknown");
String errorMessage = "Error in request " + requestId + ": " + e.getMessage();
return Mono.just(errorMessage);
})
)
.contextWrite(Context.of("requestId", "req-123"));
result.subscribe(System.out::println); // 输出: Error in request req-123: Something went wrong
// 在 doOnError 中访问 Context
Mono<String> withDoOnError = Mono.just("data")
.flatMap(d -> Mono.error(new RuntimeException("Processing error")))
.doOnError(e ->
Mono.deferContextual(ctx -> {
String requestId = ctx.get("requestId");
System.err.println("Error occurred for request: " + requestId);
return Mono.empty();
}).subscribe() // 注意:需要订阅这个内部流
)
.onErrorResume(e -> Mono.just("fallback"))
.contextWrite(Context.of("requestId", "req-456"));
withDoOnError.subscribe(System.out::println);
// 输出:
// Error occurred for request: req-456
// fallback
}
}
15.4.3 嵌套 Context 处理
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
public class NestedContextExample {
public static void main(String[] args) {
// 模拟外部服务调用
Mono<String> externalServiceCall = Mono.deferContextual(ctx -> {
String authToken = ctx.get("authToken");
String requestId = ctx.get("requestId");
System.out.println("Calling service with token: " + authToken + ", requestId: " + requestId);
return Mono.just("Service response");
});
// 嵌套 Context 使用
Mono<String> result = Mono.just("process")
.flatMap(operation ->
Mono.deferContextual(outerCtx -> {
String userId = outerCtx.get("userId");
// 创建内部 Context(添加认证令牌)
Context innerContext = outerCtx.put("authToken", "token-for-" + userId);
return externalServiceCall
.contextWrite(innerContext)
.flatMap(serviceResponse ->
Mono.deferContextual(innerCtx -> {
String requestId = innerCtx.get("requestId");
return Mono.just("Processed: " + serviceResponse + " for user " + userId + " (request: " + requestId + ")");
})
);
})
)
.contextWrite(Context.of("userId", "alice", "requestId", "req-123"));
result.subscribe(System.out::println);
// 输出:
// Calling service with token: token-for-alice, requestId: req-123
// Processed: Service response for user alice (request: req-123)
}
}
15.5 Context在实际应用中的作用
15.5.1 Web应用中的请求上下文
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import org.springframework.web.server.ServerWebExchange;
public class WebContextExample {
// 模拟的 Web 处理器
static class WebHandler {
Mono<String> handleRequest(ServerWebExchange exchange) {
return Mono.just("request-data")
.flatMap(data -> processData(data))
.contextWrite(Context.of(
"exchange", exchange,
"requestId", exchange.getRequest().getId(),
"user", getCurrentUser(exchange)
));
}
Mono<String> processData(String data) {
return Mono.deferContextual(ctx -> {
ServerWebExchange exchange = ctx.get("exchange");
String requestId = ctx.get("requestId");
String user = ctx.get("user");
System.out.println("Processing data for user: " + user + ", requestId: " + requestId);
return Mono.just("Processed: " + data);
});
}
String getCurrentUser(ServerWebExchange exchange) {
// 模拟从 exchange 中获取用户信息
return "user-alice";
}
}
// 模拟的 ServerWebExchange
static class MockServerWebExchange {
private MockServerRequest request = new MockServerRequest();
public MockServerRequest getRequest() {
return request;
}
}
static class MockServerRequest {
public String getId() {
return "req-12345";
}
}
public static void main(String[] args) {
WebHandler handler = new WebHandler();
MockServerWebExchange exchange = new MockServerWebExchange();
handler.handleRequest(exchange)
.subscribe(System.out::println);
// 输出:
// Processing data for user: user-alice, requestId: req-12345
// Processed: request-data
}
}
15.5.2 分布式追踪与日志
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import java.util.UUID;
public class TracingContextExample {
// 模拟的分布式追踪器
static class Tracer {
void logEvent(String event, String requestId) {
System.out.println("[" + requestId + "] " + event);
}
Mono<String> startSpan(String name, String requestId) {
return Mono.fromCallable(() -> {
String spanId = UUID.randomUUID().toString();
logEvent("Started span: " + name + " (" + spanId + ")", requestId);
return spanId;
});
}
Mono<Void> endSpan(String spanId, String requestId) {
return Mono.fromRunnable(() ->
logEvent("Ended span: " + spanId, requestId)
);
}
}
public static void main(String[] args) {
Tracer tracer = new Tracer();
String requestId = UUID.randomUUID().toString();
Flux.range(1, 3)
.flatMap(i ->
Mono.deferContextual(ctx -> {
String currentRequestId = ctx.get("requestId");
return tracer.startSpan("process-item-" + i, currentRequestId)
.flatMap(spanId ->
processItem(i)
.doOnSuccess(result ->
tracer.endSpan(spanId, currentRequestId).subscribe()
)
.doOnError(error ->
tracer.endSpan(spanId, currentRequestId).subscribe()
)
);
})
)
.contextWrite(Context.of("requestId", requestId))
.subscribe();
// 输出示例:
// [a1b2c3d4...] Started span: process-item-1 (e5f6g7h8...)
// [a1b2c3d4...] Ended span: e5f6g7h8...
// [a1b2c3d4...] Started span: process-item-2 (i9j0k1l2...)
// [a1b2c3d4...] Ended span: i9j0k1l2...
// [a1b2c3d4...] Started span: process-item-3 (m3n4o5p6...)
// [a1b2c3d4...] Ended span: m3n4o5p6...
}
static Mono<String> processItem(int item) {
return Mono.just("Processed item " + item)
.delayElement(java.time.Duration.ofMillis(100));
}
}
15.5.3 权限验证与安全上下文
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
public class SecurityContextExample {
// 模拟的用户认证信息
static class Authentication {
private final String username;
private final String[] roles;
public Authentication(String username, String[] roles) {
this.username = username;
this.roles = roles;
}
public String getUsername() {
return username;
}
public String[] getRoles() {
return roles;
}
public boolean hasRole(String role) {
for (String r : roles) {
if (r.equals(role)) {
return true;
}
}
return false;
}
}
// 模拟的资源服务
static class ResourceService {
Mono<String> getProtectedResource(String resourceId) {
return Mono.deferContextual(ctx -> {
Authentication auth = ctx.get("auth");
if (!auth.hasRole("ADMIN")) {
return Mono.error(new SecurityException("Access denied"));
}
return Mono.just("Protected resource: " + resourceId);
});
}
}
public static void main(String[] args) {
ResourceService service = new ResourceService();
// 有权限的用户
Authentication adminAuth = new Authentication("alice", new String[]{"USER", "ADMIN"});
service.getProtectedResource("secret-data")
.contextWrite(Context.of("auth", adminAuth))
.subscribe(
System.out::println,
error -> System.err.println("Error: " + error.getMessage())
);
// 输出: Protected resource: secret-data
// 无权限的用户
Authentication userAuth = new Authentication("bob", new String[]{"USER"});
service.getProtectedResource("secret-data")
.contextWrite(Context.of("auth", userAuth))
.subscribe(
System.out::println,
error -> System.err.println("Error: " + error.getMessage())
);
// 输出: Error: Access denied
}
}
15.6 Context 工作原理与流程图
15.6.1 Context 在反应式流中的传递
Subscriber
Operator 1
Operator 2
Operator 3
subscribe()
向下游传递
向下游传递
contextWrite(ContextA)
处理数据 + ContextA
处理数据 + ContextA
最终结果
contextWrite(ContextB)
处理数据 + ContextB
最终结果
Subscriber
Operator 1
Operator 2
Operator 3
15.6.2 Context 的读取顺序
是
否
是
否
读取 Context 值
当前操作符有 Context?
读取当前 Context
向上游查找 Context
找到 Context?
返回找到的值
使用默认值或抛出异常
返回找到的值
15.7 最佳实践与注意事项
15.7.1 合理使用 Context
// 好的做法:将相关的上下文信息放在 Context 中
Context context = Context.of(
"requestId", "req-123",
"userId", "user-alice",
"traceId", "trace-456"
);
// 不好的做法:将不相关或过多的数据放在 Context 中
Context badContext = Context.of(
"requestId", "req-123",
"unrelatedData", largeObject, // 大对象不适合放在 Context 中
"config", configMap // 配置信息应该通过其他方式传递
);
15.7.2 Context 键命名规范
// 使用有意义的键名,避免冲突
public static final String KEY_REQUEST_ID = "com.example.requestId";
public static final String KEY_USER = "com.example.user";
Context context = Context.of(KEY_REQUEST_ID, "req-123", KEY_USER, "alice");
16.7.3 错误处理
Mono.deferContextual(ctx -> {
// 使用 getOrDefault 避免 NoSuchElementException
String value = ctx.getOrDefault("key", "default");
// 或者使用 hasKey 检查
if (ctx.hasKey("key")) {
return Mono.just(ctx.get("key"));
} else {
return Mono.error(new IllegalArgumentException("Missing key"));
}
});
15.8 注意事项
15.8.1 Context是不可变的
Context original = Context.of("key", "value");
Context updated = original.put("newKey", "newValue"); // 返回新对象
System.out.println(original.hasKey("newKey")); // false
System.out.println(updated.hasKey("newKey")); // true
15.8.2 Context 与线程安全
// Context 是线程安全的,可以在多线程环境中使用
Mono.fromCallable(() -> {
// 在另一个线程中访问 Context
return "result";
})
.flatMap(result ->
Mono.deferContextual(ctx -> {
String requestId = ctx.get("requestId"); // 安全访问
return Mono.just(result + " for " + requestId);
})
)
.contextWrite(Context.of("requestId", "req-123"))
.subscribeOn(Schedulers.parallel()) // 切换到并行调度器
.subscribe();
15.8.3 性能考虑
// 避免在热点路径中频繁创建大的 Context
Flux.range(1, 1000)
.contextWrite(Context.of("data", largeData)) // 大的数据对象
.subscribe(); // 这会在每个元素上创建 Context 副本
// 更好的做法:将大数据存储在外部,Context 中只存储引用
Flux.range(1, 1000)
.contextWrite(Context.of("dataRef", dataReference))
.subscribe();
15.8 总结
Reactor 的 Context API 提供了在反应式流中传递上下文信息的强大机制。通过本文的详细讲解,我们可以总结出以下几点:
- 核心特性:
- 基于订阅的上下文存储
- 不可变的数据结构
- 线程安全的访问机制
- 支持多层上下文嵌套
- 适用场景:
- 请求范围的数据传递(如请求ID、用户信息)
- 分布式追踪和日志记录
- 权限验证和安全上下文
- 跨操作符的配置传递
- 最佳实践:
- 合理选择存储在 Context 中的数据
- 使用有意义的键名避免冲突
- 使用 getOrDefault 或 hasKey 安全访问数据
- 避免在 Context 中存储大对象
- 注意事项:
- Context 是不可变的,每次修改都会创建新对象
- Context 的读取顺序是从内到外
- 需要考虑性能影响,避免过度使用
Context API 是 Reactor 框架中处理上下文传递的标准方式,掌握了它的使用能够更好地构建可维护、可追踪的反应式应用程序。
十六、总结与实践经验
16.1 Reactor核心优势
- 高效的资源利用:通过非阻塞IO和合理的线程模型,实现高并发处理
- 强大的背压处理:内置多种背压策略,防止系统过载
- 丰富的操作符:提供函数式、声明式的数据处理方式
- 与Spring生态完美集成:Spring WebFlux、Spring Data R2DBC等
- 良好的测试支持:StepVerifier等工具简化测试编写
16.2 实践经验总结
- 线程模型选择:
- CPU密集型任务使用
Schedulers.parallel()
- IO密集型任务使用
Schedulers.boundedElastic()
- 避免在响应式链中阻塞线程
- CPU密集型任务使用
- 背压策略选择:
- 实时性要求高:
onBackpressureDrop
- 数据完整性重要:
onBackpressureBuffer
- 平衡实时与完整:
onBackpressureLatest
- 实时性要求高:
- 错误处理原则:
- 使用
onErrorResume
提供降级方案 - 使用
retryWhen
实现智能重试 - 记录错误但保持流继续运行
- 使用
- 性能优化技巧:
- 合理使用
cache
避免重复计算 - 使用
window
或buffer
进行批量处理 - 控制
flatMap
的并发度
- 合理使用
- 调试与监控:
- 使用
.name()
和.metrics()
进行监控 - 使用
Hooks.onOperatorDebug()
调试复杂流 - 利用
StepVerifier
进行全面测试
- 使用
16.3 适用场景
- 高并发Web应用:特别是微服务架构中的API网关、聚合服务
- 实时数据处理:聊天应用、实时通知、实时分析
- 流式ETL管道:数据转换、 enrichment、聚合
- 响应式数据库访问:使用R2DBC进行非阻塞数据库操作
- 消息驱动架构:与Kafka、RabbitMQ等消息中间件集成
Reactor 框架为Java开发者提供了强大的响应式编程能力,通过合理运用其丰富的操作符和灵活的线程模型,可以构建出高性能、高弹性的现代应用程序。