目录

电商支付系统中使用flink进行风控管理案例

电商支付系统中使用flink进行风控管理案例

交易系统代码



@RestController
@RequestMapping("/api/payment")
public class PaymentController {
    
    @Autowired
    private KafkaTemplate<String, PaymentRequest> kafkaTemplate;
    
    @Autowired
    private RiskDecisionCache riskDecisionCache;
    
    @Autowired
    private AuditLogService auditLogService;

    @PostMapping
    public ResponseEntity<PaymentResponse> processPayment(@RequestBody PaymentRequest request) {
        // 1. 发送风控评估请求
        String correlationId = UUID.randomUUID().toString();
        request.setCorrelationId(correlationId);
        kafkaTemplate.send("payment.risk.evaluation", request);
        
        // 2. 同步等待风控决策(带超时)
        RiskDecision decision = riskDecisionCache.awaitDecision(correlationId, 5000);
        
        // 3. 处理决策结果
        if (decision == null) {
            return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
                   .body(new PaymentResponse("风控系统响应超时"));
        }
        
        if (decision.getDecision().equals("REJECT")) {
            auditLogService.logRejection(request, decision);
            return ResponseEntity.badRequest()
                   .body(new PaymentResponse("支付被拒绝: " + decision.getReason()));
        }
        
        // 继续支付流程...
        return ResponseEntity.ok(new PaymentResponse("支付处理中"));
    }
}


@Service
public class RiskDecisionCache {
    private final Map<String, RiskDecision> decisionMap = new ConcurrentHashMap<>();
    private final Map<String, CountDownLatch> latchMap = new ConcurrentHashMap<>();

    public void putDecision(String correlationId, RiskDecision decision) {
        decisionMap.put(correlationId, decision);
        CountDownLatch latch = latchMap.get(correlationId);
        if (latch != null) {
            latch.countDown();
        }
    }

    public RiskDecision awaitDecision(String correlationId, long timeoutMillis) {
        try {
            CountDownLatch latch = new CountDownLatch(1);
            latchMap.put(correlationId, latch);
            
            if (!latch.await(timeoutMillis, TimeUnit.MILLISECONDS)) {
                return null;
            }
            
            return decisionMap.remove(correlationId);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            latchMap.remove(correlationId);
        }
    }
}

@KafkaListener(topics = "payment.risk.result")
public void listenRiskDecision(RiskDecision decision) {
    riskDecisionCache.putDecision(decision.getCorrelationId(), decision);
}


<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

代码说明:

  1. 支付网关通过Kafka发送风控评估请求并同步等待结果,使用correlationId关联请求和响应
  2. RiskDecisionCache实现带超时的等待机制,避免支付流程长时间阻塞
  3. Kafka监听器异步接收风控决策并唤醒等待线程
  4. 拦截逻辑清晰分离,REJECT决策会记录审计日志并立即返回错误响应
  5. 完整Spring Boot + Kafka集成方案,包含必要的异常处理和资源清理

关键实现细节:

  • 使用CountDownLatch实现请求-响应同步模式
  • 线程安全的ConcurrentHashMap存储决策状态
  • 5000ms超时机制防止系统挂起
  • 审计日志服务记录完整拒绝信息
  • 通过Kafka消息头或消息体携带correlationId实现请求关联

风控代码



import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector

case class PaymentEvent(
  paymentId: String,
  userId: String,
  deviceId: String,
  ip: String,
  amount: Double,
  timestamp: Long,
  merchantId: String
)

case class RiskControlResult(
  paymentId: String,
  riskScore: Double,
  rulesTriggered: List[String],
  decision: String // ALLOW/REJECT/CHALLENGE
)

object PaymentRiskControlJob {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    // 1. 从Kafka消费支付事件流
    val paymentStream = env
      .addSource(new FlinkKafkaConsumer[PaymentEvent](
        "payment-events",
        new PaymentEventDeserializer(),
        kafkaProps))
      .keyBy(_.userId)

    // 2. 实时特征计算
    val featureStream = paymentStream
      .process(new UserBehaviorFeatureCalculator())
      .name("feature_calculator")

    // 3. 规则引擎处理
    val ruleStream = featureStream
      .process(new RiskRuleEngine())
      .name("rule_engine")

    // 4. 模型评分
    val modelStream = ruleStream
      .keyBy(_.paymentId)
      .process(new RiskModelScorer())
      .name("model_scorer")

    // 5. 决策引擎
    val decisionStream = modelStream
      .process(new DecisionEngine())
      .name("decision_engine")

    // 6. 输出到Kafka和告警系统
    decisionStream
      .addSink(new FlinkKafkaProducer[RiskControlResult](
        "risk-decisions",
        new RiskControlResultSerializer(),
        kafkaProps))
      .name("kafka_sink")

    env.execute("Real-time Payment Risk Control")
  }
}

// 用户行为特征计算
class UserBehaviorFeatureCalculator extends KeyedProcessFunction[String, PaymentEvent, (PaymentEvent, Map[String, Any])] {
  private var userPaymentStats: ValueState[UserStats] = _

  override def open(parameters: Configuration): Unit = {
    val statsDesc = new ValueStateDescriptor[UserStats]("userStats", classOf[UserStats])
    userPaymentStats = getRuntimeContext.getState(statsDesc)
  }

  override def processElement(
    event: PaymentEvent,
    ctx: KeyedProcessFunction[String, PaymentEvent, (PaymentEvent, Map[String, Any])]#Context,
    out: Collector[(PaymentEvent, Map[String, Any])]): Unit = {
    
    val currentStats = Option(userPaymentStats.value()).getOrElse(UserStats.empty)
    val updatedStats = currentStats.update(event)
    userPaymentStats.update(updatedStats)

    // 计算实时特征
    val features = Map(
      "amount" -> event.amount,
      "hourly_count" -> updatedStats.hourlyCount,
      "daily_amount" -> updatedStats.dailyAmount,
      "device_change" -> (currentStats.lastDevice != event.deviceId),
      "ip_change" -> (currentStats.lastIp != event.ip)
    )

    out.collect((event, features))
  }
}

// 规则引擎实现
class RiskRuleEngine extends KeyedProcessFunction[String, (PaymentEvent, Map[String, Any]), (PaymentEvent, Double, List[String])] {
  override def processElement(
    value: (PaymentEvent, Map[String, Any]),
    ctx: KeyedProcessFunction[String, (PaymentEvent, Map[String, Any]), (PaymentEvent, Double, List[String])]#Context,
    out: Collector[(PaymentEvent, Double, List[String])]): Unit = {
    
    val (event, features) = value
    var riskScore = 0.0
    var triggeredRules = List.empty[String]

    // 规则1: 大额交易检测
    if (event.amount > 10000 && features("daily_amount").asInstanceOf[Double] < 5000) {
      riskScore += 0.3
      triggeredRules ::= "BIG_AMOUNT_NEW"
    }

    // 规则2: 高频交易检测
    if (features("hourly_count").asInstanceOf[Int] > 10) {
      riskScore += 0.4
      triggeredRules ::= "HIGH_FREQUENCY"
    }

    // 规则3: 设备/IP变更检测
    if (features("device_change").asInstanceOf[Boolean] && features("ip_change").asInstanceOf[Boolean]) {
      riskScore += 0.5
      triggeredRules ::= "DEVICE_IP_CHANGE"
    }

    out.collect((event, riskScore, triggeredRules))
  }
}

// 决策引擎实现
class DecisionEngine extends KeyedProcessFunction[String, (PaymentEvent, Double, Double), RiskControlResult] {
  override def processElement(
    value: (PaymentEvent, Double, Double),
    ctx: KeyedProcessFunction[String, (PaymentEvent, Double, Double), RiskControlResult]#Context,
    out: Collector[RiskControlResult]): Unit = {
    
    val (event, ruleScore, modelScore) = value
    val finalScore = (ruleScore * 0.6) + (modelScore * 0.4)
    val decision = finalScore match {
      case s if s > 0.8 => "REJECT"
      case s if s > 0.5 => "CHALLENGE"
      case _ => "ALLOW"
    }

    out.collect(RiskControlResult(
      event.paymentId,
      finalScore,
      Nil, // 实际应从上游传递
      decision
    ))
  }
}

代码说明:

  1. 完整数据流:Kafka消费支付事件 → 特征计算 → 规则引擎 → 模型评分 → 决策引擎 → Kafka输出结果
  2. 实时特征计算:维护用户支付状态(ValueState),计算每小时交易次数、每日金额、设备/IP变更等特征
  3. 规则引擎实现:包含大额交易、高频交易、设备变更等典型风控规则,输出风险分值和触发规则列表
  4. 决策融合:结合规则评分(60%)和模型评分(40%)做出最终拦截决策(REJECT/CHALLENGE/ALLOW)
  5. 状态管理:使用Flink ValueState保存用户历史行为数据,支持精确一次语义
  6. 生产级配置:包含Kafka序列化、检查点配置等生产必要组件(示例简化)

拦截实现原理:

  1. 支付网关在收到支付请求时,同步等待风控决策(通常通过Kafka+请求ID关联)
  2. 当决策为REJECT时,支付网关直接返回"支付被拒绝"并记录审计日志
  3. 当决策为CHALLENGE时,触发二次验证(短信/人脸识别)
  4. 决策结果100ms内返回,满足支付流程实时性要求

扩展建议:

  1. 增加CEP模块检测复杂模式(如短时间内多账户同设备支付)
  2. 集成Redis查询黑名单等外部数据源
  3. 添加机器学习模型服务调用(PMML/TensorFlow Serving)
  4. 实现动态规则更新机制(通过广播流)