电商支付系统中使用flink进行风控管理案例
目录
电商支付系统中使用flink进行风控管理案例
交易系统代码
@RestController @RequestMapping("/api/payment") public class PaymentController { @Autowired private KafkaTemplate<String, PaymentRequest> kafkaTemplate; @Autowired private RiskDecisionCache riskDecisionCache; @Autowired private AuditLogService auditLogService; @PostMapping public ResponseEntity<PaymentResponse> processPayment(@RequestBody PaymentRequest request) { // 1. 发送风控评估请求 String correlationId = UUID.randomUUID().toString(); request.setCorrelationId(correlationId); kafkaTemplate.send("payment.risk.evaluation", request); // 2. 同步等待风控决策(带超时) RiskDecision decision = riskDecisionCache.awaitDecision(correlationId, 5000); // 3. 处理决策结果 if (decision == null) { return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT) .body(new PaymentResponse("风控系统响应超时")); } if (decision.getDecision().equals("REJECT")) { auditLogService.logRejection(request, decision); return ResponseEntity.badRequest() .body(new PaymentResponse("支付被拒绝: " + decision.getReason())); } // 继续支付流程... return ResponseEntity.ok(new PaymentResponse("支付处理中")); } } @Service public class RiskDecisionCache { private final Map<String, RiskDecision> decisionMap = new ConcurrentHashMap<>(); private final Map<String, CountDownLatch> latchMap = new ConcurrentHashMap<>(); public void putDecision(String correlationId, RiskDecision decision) { decisionMap.put(correlationId, decision); CountDownLatch latch = latchMap.get(correlationId); if (latch != null) { latch.countDown(); } } public RiskDecision awaitDecision(String correlationId, long timeoutMillis) { try { CountDownLatch latch = new CountDownLatch(1); latchMap.put(correlationId, latch); if (!latch.await(timeoutMillis, TimeUnit.MILLISECONDS)) { return null; } return decisionMap.remove(correlationId); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { latchMap.remove(correlationId); } } } @KafkaListener(topics = "payment.risk.result") public void listenRiskDecision(RiskDecision decision) { riskDecisionCache.putDecision(decision.getCorrelationId(), decision); }
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
代码说明:
- 支付网关通过Kafka发送风控评估请求并同步等待结果,使用correlationId关联请求和响应
- RiskDecisionCache实现带超时的等待机制,避免支付流程长时间阻塞
- Kafka监听器异步接收风控决策并唤醒等待线程
- 拦截逻辑清晰分离,REJECT决策会记录审计日志并立即返回错误响应
- 完整Spring Boot + Kafka集成方案,包含必要的异常处理和资源清理
关键实现细节:
- 使用CountDownLatch实现请求-响应同步模式
- 线程安全的ConcurrentHashMap存储决策状态
- 5000ms超时机制防止系统挂起
- 审计日志服务记录完整拒绝信息
- 通过Kafka消息头或消息体携带correlationId实现请求关联
风控代码
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
case class PaymentEvent(
paymentId: String,
userId: String,
deviceId: String,
ip: String,
amount: Double,
timestamp: Long,
merchantId: String
)
case class RiskControlResult(
paymentId: String,
riskScore: Double,
rulesTriggered: List[String],
decision: String // ALLOW/REJECT/CHALLENGE
)
object PaymentRiskControlJob {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1. 从Kafka消费支付事件流
val paymentStream = env
.addSource(new FlinkKafkaConsumer[PaymentEvent](
"payment-events",
new PaymentEventDeserializer(),
kafkaProps))
.keyBy(_.userId)
// 2. 实时特征计算
val featureStream = paymentStream
.process(new UserBehaviorFeatureCalculator())
.name("feature_calculator")
// 3. 规则引擎处理
val ruleStream = featureStream
.process(new RiskRuleEngine())
.name("rule_engine")
// 4. 模型评分
val modelStream = ruleStream
.keyBy(_.paymentId)
.process(new RiskModelScorer())
.name("model_scorer")
// 5. 决策引擎
val decisionStream = modelStream
.process(new DecisionEngine())
.name("decision_engine")
// 6. 输出到Kafka和告警系统
decisionStream
.addSink(new FlinkKafkaProducer[RiskControlResult](
"risk-decisions",
new RiskControlResultSerializer(),
kafkaProps))
.name("kafka_sink")
env.execute("Real-time Payment Risk Control")
}
}
// 用户行为特征计算
class UserBehaviorFeatureCalculator extends KeyedProcessFunction[String, PaymentEvent, (PaymentEvent, Map[String, Any])] {
private var userPaymentStats: ValueState[UserStats] = _
override def open(parameters: Configuration): Unit = {
val statsDesc = new ValueStateDescriptor[UserStats]("userStats", classOf[UserStats])
userPaymentStats = getRuntimeContext.getState(statsDesc)
}
override def processElement(
event: PaymentEvent,
ctx: KeyedProcessFunction[String, PaymentEvent, (PaymentEvent, Map[String, Any])]#Context,
out: Collector[(PaymentEvent, Map[String, Any])]): Unit = {
val currentStats = Option(userPaymentStats.value()).getOrElse(UserStats.empty)
val updatedStats = currentStats.update(event)
userPaymentStats.update(updatedStats)
// 计算实时特征
val features = Map(
"amount" -> event.amount,
"hourly_count" -> updatedStats.hourlyCount,
"daily_amount" -> updatedStats.dailyAmount,
"device_change" -> (currentStats.lastDevice != event.deviceId),
"ip_change" -> (currentStats.lastIp != event.ip)
)
out.collect((event, features))
}
}
// 规则引擎实现
class RiskRuleEngine extends KeyedProcessFunction[String, (PaymentEvent, Map[String, Any]), (PaymentEvent, Double, List[String])] {
override def processElement(
value: (PaymentEvent, Map[String, Any]),
ctx: KeyedProcessFunction[String, (PaymentEvent, Map[String, Any]), (PaymentEvent, Double, List[String])]#Context,
out: Collector[(PaymentEvent, Double, List[String])]): Unit = {
val (event, features) = value
var riskScore = 0.0
var triggeredRules = List.empty[String]
// 规则1: 大额交易检测
if (event.amount > 10000 && features("daily_amount").asInstanceOf[Double] < 5000) {
riskScore += 0.3
triggeredRules ::= "BIG_AMOUNT_NEW"
}
// 规则2: 高频交易检测
if (features("hourly_count").asInstanceOf[Int] > 10) {
riskScore += 0.4
triggeredRules ::= "HIGH_FREQUENCY"
}
// 规则3: 设备/IP变更检测
if (features("device_change").asInstanceOf[Boolean] && features("ip_change").asInstanceOf[Boolean]) {
riskScore += 0.5
triggeredRules ::= "DEVICE_IP_CHANGE"
}
out.collect((event, riskScore, triggeredRules))
}
}
// 决策引擎实现
class DecisionEngine extends KeyedProcessFunction[String, (PaymentEvent, Double, Double), RiskControlResult] {
override def processElement(
value: (PaymentEvent, Double, Double),
ctx: KeyedProcessFunction[String, (PaymentEvent, Double, Double), RiskControlResult]#Context,
out: Collector[RiskControlResult]): Unit = {
val (event, ruleScore, modelScore) = value
val finalScore = (ruleScore * 0.6) + (modelScore * 0.4)
val decision = finalScore match {
case s if s > 0.8 => "REJECT"
case s if s > 0.5 => "CHALLENGE"
case _ => "ALLOW"
}
out.collect(RiskControlResult(
event.paymentId,
finalScore,
Nil, // 实际应从上游传递
decision
))
}
}
代码说明:
- 完整数据流:Kafka消费支付事件 → 特征计算 → 规则引擎 → 模型评分 → 决策引擎 → Kafka输出结果
- 实时特征计算:维护用户支付状态(ValueState),计算每小时交易次数、每日金额、设备/IP变更等特征
- 规则引擎实现:包含大额交易、高频交易、设备变更等典型风控规则,输出风险分值和触发规则列表
- 决策融合:结合规则评分(60%)和模型评分(40%)做出最终拦截决策(REJECT/CHALLENGE/ALLOW)
- 状态管理:使用Flink ValueState保存用户历史行为数据,支持精确一次语义
- 生产级配置:包含Kafka序列化、检查点配置等生产必要组件(示例简化)
拦截实现原理:
- 支付网关在收到支付请求时,同步等待风控决策(通常通过Kafka+请求ID关联)
- 当决策为REJECT时,支付网关直接返回"支付被拒绝"并记录审计日志
- 当决策为CHALLENGE时,触发二次验证(短信/人脸识别)
- 决策结果100ms内返回,满足支付流程实时性要求
扩展建议:
- 增加CEP模块检测复杂模式(如短时间内多账户同设备支付)
- 集成Redis查询黑名单等外部数据源
- 添加机器学习模型服务调用(PMML/TensorFlow Serving)
- 实现动态规则更新机制(通过广播流)