目录

分布式方案-一-分布式锁的四大实现方式

分布式方案 一 分布式锁的四大实现方式

Java分布式锁实现方式详解

什么是分布式锁

分布式锁是在分布式系统中,用于控制多个进程/节点对共享资源的访问的一种同步机制。与单机环境下的锁不同,分布式锁需要在多个节点之间协调,确保在任意时刻只有一个节点能够获得锁。

分布式锁的特性要求

  • 互斥性:在任意时刻,只有一个客户端能持有锁
  • 安全性:锁只能被持有该锁的客户端删除,不能被其他客户端删除
  • 避免死锁:获取锁的客户端因为某些原因而没有释放锁,其他客户端再也无法获取锁
  • 容错性:只要大部分节点正常运行,客户端就可以加锁和解锁

基于数据库的分布式锁

实现原理

利用数据库的唯一索引特性来实现分布式锁。通过在数据库中插入一条记录来获取锁,删除记录来释放锁。

数据库表结构

CREATE TABLE distributed_lock (
    id INT PRIMARY KEY AUTO_INCREMENT,
    lock_name VARCHAR(64) NOT NULL COMMENT '锁名称',
    lock_value VARCHAR(64) NOT NULL COMMENT '锁值',
    expire_time TIMESTAMP NOT NULL COMMENT '过期时间',
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE KEY uk_lock_name (lock_name)
);

Java实现示例

1. 基于唯一索引的实现
import java.sql.*;
import java.util.concurrent.TimeUnit;

public class DatabaseDistributedLock {
    
    private Connection connection;
    private String lockName;
    private String lockValue;
    private long expireTime;
    
    public DatabaseDistributedLock(Connection connection, String lockName) {
        this.connection = connection;
        this.lockName = lockName;
        this.lockValue = Thread.currentThread().getName() + "-" + System.currentTimeMillis();
    }
    
    /**
     * 获取锁
     * @param timeout 超时时间(秒)
     * @return 是否获取成功
     */
    public boolean tryLock(long timeout) {
        long startTime = System.currentTimeMillis();
        long timeoutMillis = timeout * 1000;
        
        while (System.currentTimeMillis() - startTime < timeoutMillis) {
            try {
                // 尝试插入锁记录
                String sql = "INSERT INTO distributed_lock (lock_name, lock_value, expire_time) VALUES (?, ?, ?)";
                PreparedStatement stmt = connection.prepareStatement(sql);
                stmt.setString(1, lockName);
                stmt.setString(2, lockValue);
                stmt.setTimestamp(3, new Timestamp(System.currentTimeMillis() + 30000)); // 30秒过期
                
                int result = stmt.executeUpdate();
                if (result > 0) {
                    return true; // 获取锁成功
                }
            } catch (SQLException e) {
                // 插入失败,说明锁已被其他线程持有
                if (e.getErrorCode() == 1062) { // MySQL唯一键冲突错误码
                    // 检查锁是否过期
                    cleanExpiredLock();
                }
            }
            
            try {
                Thread.sleep(100); // 等待100ms后重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        
        return false;
    }
    
    /**
     * 释放锁
     */
    public void unlock() {
        try {
            String sql = "DELETE FROM distributed_lock WHERE lock_name = ? AND lock_value = ?";
            PreparedStatement stmt = connection.prepareStatement(sql);
            stmt.setString(1, lockName);
            stmt.setString(2, lockValue);
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 清理过期锁
     */
    private void cleanExpiredLock() {
        try {
            String sql = "DELETE FROM distributed_lock WHERE lock_name = ? AND expire_time < ?";
            PreparedStatement stmt = connection.prepareStatement(sql);
            stmt.setString(1, lockName);
            stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

优缺点分析

优点

  • 实现简单,易于理解
  • 利用数据库事务特性保证一致性
  • 不需要额外的中间件

缺点

  • 性能较差,数据库压力大
  • 单点故障风险
  • 锁的粒度较粗

基于Redis的分布式锁

实现原理

利用Redis的原子性操作来实现分布式锁。主要使用SET命令的NX(Not eXists)和EX(EXpire)参数。

Java实现示例

1. 基于Jedis的简单实现
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

public class RedisDistributedLock {
    
    private Jedis jedis;
    private String lockKey;
    private String lockValue;
    private int expireTime;
    
    public RedisDistributedLock(Jedis jedis, String lockKey, int expireTime) {
        this.jedis = jedis;
        this.lockKey = lockKey;
        this.lockValue = Thread.currentThread().getName() + "-" + System.currentTimeMillis();
        this.expireTime = expireTime;
    }
    
    /**
     * 获取锁
     * @param timeout 超时时间(毫秒)
     * @return 是否获取成功
     */
    public boolean tryLock(long timeout) {
        long startTime = System.currentTimeMillis();
        
        while (System.currentTimeMillis() - startTime < timeout) {
            // 使用SET命令的NX和EX参数实现原子操作
            SetParams params = SetParams.setParams().nx().ex(expireTime);
            String result = jedis.set(lockKey, lockValue, params);
            
            if ("OK".equals(result)) {
                return true; // 获取锁成功
            }
            
            try {
                Thread.sleep(100); // 等待100ms后重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        
        return false;
    }
    
    /**
     * 释放锁(使用Lua脚本保证原子性)
     */
    public void unlock() {
        String luaScript = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('del', KEYS[1]) " +
            "else " +
            "    return 0 " +
            "end";
        
        jedis.eval(luaScript, 1, lockKey, lockValue);
    }
    
    /**
     * 锁续期
     */
    public boolean renewLock() {
        String luaScript = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('expire', KEYS[1], ARGV[2]) " +
            "else " +
            "    return 0 " +
            "end";
        
        Object result = jedis.eval(luaScript, 1, lockKey, lockValue, String.valueOf(expireTime));
        return "1".equals(result.toString());
    }
}
2. 基于Redisson的实现
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class RedissonDistributedLock {
    
    private RedissonClient redissonClient;
    
    public RedissonDistributedLock() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        this.redissonClient = Redisson.create(config);
    }
    
    /**
     * 获取锁并执行业务逻辑
     */
    public void executeWithLock(String lockKey, Runnable task) {
        RLock lock = redissonClient.getLock(lockKey);
        
        try {
            // 尝试获取锁,最多等待10秒,锁自动释放时间为30秒
            if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
                System.out.println("获取锁成功:" + lockKey);
                task.run(); // 执行业务逻辑
            } else {
                System.out.println("获取锁失败:" + lockKey);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
                System.out.println("释放锁:" + lockKey);
            }
        }
    }
    
    public void shutdown() {
        redissonClient.shutdown();
    }
}

优缺点分析

优点

  • 性能高,支持高并发
  • 支持锁过期时间,避免死锁
  • 实现相对简单

缺点

  • Redis单点故障风险
  • 时钟偏移可能导致锁失效
  • 需要考虑锁续期问题

基于ZooKeeper的分布式锁

实现原理

利用ZooKeeper的临时顺序节点特性来实现分布式锁。客户端在指定路径下创建临时顺序节点,序号最小的节点获得锁。

Java实现示例

1. 基于Apache Curator的实现
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class ZooKeeperDistributedLock {
    
    private CuratorFramework client;
    private InterProcessMutex lock;
    
    public ZooKeeperDistributedLock(String connectString, String lockPath) {
        // 创建ZooKeeper客户端
        this.client = CuratorFrameworkFactory.newClient(
            connectString, 
            new ExponentialBackoffRetry(1000, 3)
        );
        this.client.start();
        
        // 创建分布式锁
        this.lock = new InterProcessMutex(client, lockPath);
    }
    
    /**
     * 获取锁
     * @param timeout 超时时间
     * @param unit 时间单位
     * @return 是否获取成功
     */
    public boolean tryLock(long timeout, TimeUnit unit) {
        try {
            return lock.acquire(timeout, unit);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    /**
     * 释放锁
     */
    public void unlock() {
        try {
            lock.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 关闭客户端
     */
    public void close() {
        client.close();
    }
}
2. 手动实现ZooKeeper分布式锁
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class CustomZooKeeperLock implements Watcher {
    
    private ZooKeeper zooKeeper;
    private String lockPath;
    private String currentPath;
    private String waitPath;
    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);
    
    public CustomZooKeeperLock(String connectString, String lockPath) throws IOException, InterruptedException {
        this.lockPath = lockPath;
        
        // 创建ZooKeeper连接
        zooKeeper = new ZooKeeper(connectString, 5000, this);
        connectLatch.await();
        
        // 创建根节点
        Stat stat = zooKeeper.exists(lockPath, false);
        if (stat == null) {
            zooKeeper.create(lockPath, "".getBytes(), 
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
    
    /**
     * 获取锁
     */
    public boolean tryLock() {
        try {
            // 创建临时顺序节点
            currentPath = zooKeeper.create(lockPath + "/lock-", "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            
            // 获取所有子节点并排序
            List<String> children = zooKeeper.getChildren(lockPath, false);
            Collections.sort(children);
            
            String thisNode = currentPath.substring((lockPath + "/").length());
            int index = children.indexOf(thisNode);
            
            if (index == 0) {
                // 当前节点是最小的,获取锁成功
                return true;
            } else {
                // 监听前一个节点
                waitPath = lockPath + "/" + children.get(index - 1);
                Stat stat = zooKeeper.exists(waitPath, true);
                if (stat == null) {
                    // 前一个节点不存在,重新尝试获取锁
                    return tryLock();
                } else {
                    // 等待前一个节点删除
                    waitLatch.await();
                    return tryLock();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    /**
     * 释放锁
     */
    public void unlock() {
        try {
            zooKeeper.delete(currentPath, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            connectLatch.countDown();
        }
        
        if (event.getType() == Event.EventType.NodeDeleted && 
            event.getPath().equals(waitPath)) {
            waitLatch.countDown();
        }
    }
    
    public void close() throws InterruptedException {
        zooKeeper.close();
    }
}

优缺点分析

优点

  • 可靠性高,支持集群
  • 避免死锁,临时节点自动删除
  • 支持阻塞等待

缺点

  • 性能相对较低
  • 复杂度较高
  • 依赖ZooKeeper集群

基于Etcd的分布式锁

实现原理

利用Etcd的租约(Lease)机制和==事务(Transaction)==来实现分布式锁。通过创建带有租约的键值对来获取锁。

Java实现示例

1. 基于jetcd的实现
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.GetOption;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class EtcdDistributedLock {
    
    private Client client;
    private KV kvClient;
    private Lease leaseClient;
    private String lockKey;
    private String lockValue;
    private long leaseId;
    
    public EtcdDistributedLock(String endpoints, String lockKey) {
        this.client = Client.builder().endpoints(endpoints).build();
        this.kvClient = client.getKVClient();
        this.leaseClient = client.getLeaseClient();
        this.lockKey = lockKey;
        this.lockValue = Thread.currentThread().getName() + "-" + System.currentTimeMillis();
    }
    
    /**
     * 获取锁
     * @param timeout 超时时间(秒)
     * @return 是否获取成功
     */
    public boolean tryLock(long timeout) {
        try {
            // 创建租约
            long ttl = Math.max(timeout, 30); // 至少30秒
            CompletableFuture<io.etcd.jetcd.lease.LeaseGrantResponse> leaseFuture = 
                leaseClient.grant(ttl);
            leaseId = leaseFuture.get().getID();
            
            // 开启租约续期
            leaseClient.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
                @Override
                public void onNext(LeaseKeepAliveResponse value) {
                    // 租约续期成功
                }
                
                @Override
                public void onError(Throwable t) {
                    // 租约续期失败
                }
                
                @Override
                public void onCompleted() {
                    // 租约续期完成
                }
            });
            
            ByteSequence key = ByteSequence.from(lockKey, StandardCharsets.UTF_8);
            ByteSequence value = ByteSequence.from(lockValue, StandardCharsets.UTF_8);
            
            long startTime = System.currentTimeMillis();
            long timeoutMillis = timeout * 1000;
            
            while (System.currentTimeMillis() - startTime < timeoutMillis) {
                // 使用事务来原子性地检查和设置锁
                CompletableFuture<TxnResponse> txnFuture = kvClient.txn()
                    .If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.createRevision(0))) // 键不存在
                    .Then(Op.put(key, value, io.etcd.jetcd.options.PutOption.newBuilder()
                        .withLeaseId(leaseId).build())) // 设置键值对
                    .commit();
                
                TxnResponse txnResponse = txnFuture.get();
                if (txnResponse.isSucceeded()) {
                    return true; // 获取锁成功
                }
                
                Thread.sleep(100); // 等待100ms后重试
            }
            
            // 获取锁失败,撤销租约
            leaseClient.revoke(leaseId);
            return false;
            
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    /**
     * 释放锁
     */
    public void unlock() {
        try {
            ByteSequence key = ByteSequence.from(lockKey, StandardCharsets.UTF_8);
            ByteSequence value = ByteSequence.from(lockValue, StandardCharsets.UTF_8);
            
            // 使用事务来原子性地检查和删除锁
            CompletableFuture<TxnResponse> txnFuture = kvClient.txn()
                .If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.value(value))) // 检查锁的值
                .Then(Op.delete(key, io.etcd.jetcd.options.DeleteOption.DEFAULT)) // 删除锁
                .commit();
            
            txnFuture.get();
            
            // 撤销租约
            leaseClient.revoke(leaseId);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 关闭客户端
     */
    public void close() {
        kvClient.close();
        leaseClient.close();
        client.close();
    }
}

优缺点分析

优点

  • 强一致性,基于Raft算法
  • 支持租约机制,自动过期
  • 性能较好

缺点

  • 相对较新,生态不够成熟
  • 学习成本较高
  • 依赖Etcd集群

各种实现方式对比

特性数据库锁Redis锁ZooKeeper锁Etcd锁
性能中高
可靠性
一致性强一致性最终一致性强一致性强一致性
实现复杂度简单中等复杂中等
单点故障
锁续期需要需要自动自动
阻塞等待需要轮询需要轮询支持需要轮询
适用场景小并发高并发高可靠性云原生

最佳实践建议

1. 选择建议

  • 高并发场景:推荐使用Redis分布式锁
  • 高可靠性要求:推荐使用ZooKeeper分布式锁
  • 云原生环境:推荐使用Etcd分布式锁
  • 简单场景:可以考虑数据库分布式锁

2. 通用分布式锁接口设计

public interface DistributedLock {
    
    /**
     * 尝试获取锁
     * @param timeout 超时时间
     * @param unit 时间单位
     * @return 是否获取成功
     */
    boolean tryLock(long timeout, TimeUnit unit);
    
    /**
     * 释放锁
     */
    void unlock();
    
    /**
     * 锁续期
     * @return 是否续期成功
     */
    boolean renewLock();
    
    /**
     * 检查锁是否被当前线程持有
     * @return 是否持有锁
     */
    boolean isHeldByCurrentThread();
}

3. 分布式锁工厂

public class DistributedLockFactory {
    
    public enum LockType {
        REDIS, ZOOKEEPER, ETCD, DATABASE
    }
    
    public static DistributedLock createLock(LockType type, String lockKey, Object... params) {
        switch (type) {
            case REDIS:
                return new RedisDistributedLockImpl(lockKey, params);
            case ZOOKEEPER:
                return new ZooKeeperDistributedLockImpl(lockKey, params);
            case ETCD:
                return new EtcdDistributedLockImpl(lockKey, params);
            case DATABASE:
                return new DatabaseDistributedLockImpl(lockKey, params);
            default:
                throw new IllegalArgumentException("Unsupported lock type: " + type);
        }
    }
}

4. 使用模板

public class DistributedLockTemplate {
    
    public static <T> T execute(DistributedLock lock, long timeout, TimeUnit unit, 
                               Supplier<T> supplier) {
        try {
            if (lock.tryLock(timeout, unit)) {
                return supplier.get();
            } else {
                throw new RuntimeException("Failed to acquire lock");
            }
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
    
    public static void execute(DistributedLock lock, long timeout, TimeUnit unit, 
                              Runnable runnable) {
        execute(lock, timeout, unit, () -> {
            runnable.run();
            return null;
        });
    }
}

5. 注意事项

  • 避免死锁:设置合理的锁过期时间
  • 锁续期:对于长时间运行的任务,需要实现锁续期机制
  • 异常处理:在finally块中释放锁
  • 锁粒度:选择合适的锁粒度,避免锁竞争
  • 监控告警:监控锁的获取和释放情况

通过合理选择和使用分布式锁,可以有效解决分布式系统中的并发控制问题,确保数据的一致性和系统的稳定性。


多节点/线程调用测试结果

为了更好地理解各种分布式锁在实际多线程/多节点环境下的表现,以下展示了各种实现方式的运行结果。

1. 基于数据库的分布式锁 - 多线程测试

测试代码

public class DatabaseLockMultiThreadTest {
    
    private static final String LOCK_NAME = "order_process_lock";
    private static final AtomicInteger counter = new AtomicInteger(0);
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        CountDownLatch latch = new CountDownLatch(5);
        
        for (int i = 0; i < 5; i++) {
            final int threadId = i + 1;
            executor.submit(() -> {
                try {
                    processOrder(threadId);
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        System.out.println("最终计数器值: " + counter.get());
    }
    
    private static void processOrder(int threadId) {
        try {
            Connection connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/test", "root", "password");
            DatabaseDistributedLock lock = new DatabaseDistributedLock(connection, LOCK_NAME);
            
            System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 尝试获取锁");
            
            if (lock.tryLock(10)) {
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 获取锁成功,开始处理订单");
                
                // 模拟订单处理
                int currentValue = counter.get();
                Thread.sleep(2000); // 模拟业务处理时间
                counter.set(currentValue + 1);
                
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 订单处理完成,计数器: " + counter.get());
                
                lock.unlock();
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 释放锁");
            } else {
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 获取锁失败,超时");
            }
            
            connection.close();
        } catch (Exception e) {
            System.err.println("线程-" + threadId + " 执行异常: " + e.getMessage());
        }
    }
    
    private static String getCurrentTime() {
        return new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
    }
}

运行结果输出

[14:23:15.123] 线程-1 尝试获取锁
[14:23:15.124] 线程-2 尝试获取锁
[14:23:15.125] 线程-3 尝试获取锁
[14:23:15.126] 线程-4 尝试获取锁
[14:23:15.127] 线程-5 尝试获取锁
[14:23:15.145] 线程-1 获取锁成功,开始处理订单
[14:23:17.150] 线程-1 订单处理完成,计数器: 1
[14:23:17.151] 线程-1 释放锁
[14:23:17.165] 线程-3 获取锁成功,开始处理订单
[14:23:19.170] 线程-3 订单处理完成,计数器: 2
[14:23:19.171] 线程-3 释放锁
[14:23:19.185] 线程-2 获取锁成功,开始处理订单
[14:23:21.190] 线程-2 订单处理完成,计数器: 3
[14:23:21.191] 线程-2 释放锁
[14:23:21.205] 线程-4 获取锁成功,开始处理订单
[14:23:23.210] 线程-4 订单处理完成,计数器: 4
[14:23:23.211] 线程-4 释放锁
[14:23:23.225] 线程-5 获取锁成功,开始处理订单
[14:23:25.230] 线程-5 订单处理完成,计数器: 5
[14:23:25.231] 线程-5 释放锁
最终计数器值: 5

分析:数据库锁确保了严格的互斥性,每个线程按顺序获取锁,处理完成后释放,保证了数据的一致性。

2. 基于Redis的分布式锁 - 多节点测试

测试代码(模拟多节点)

public class RedisLockMultiNodeTest {
    
    private static final String LOCK_KEY = "inventory_update_lock";
    private static final AtomicInteger inventory = new AtomicInteger(100);
    
    public static void main(String[] args) throws InterruptedException {
        // 模拟3个节点同时运行
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CountDownLatch latch = new CountDownLatch(3);
        
        for (int i = 0; i < 3; i++) {
            final int nodeId = i + 1;
            executor.submit(() -> {
                try {
                    simulateNode(nodeId);
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        System.out.println("最终库存: " + inventory.get());
    }
    
    private static void simulateNode(int nodeId) {
        Jedis jedis = new Jedis("localhost", 6379);
        
        for (int i = 0; i < 10; i++) {
            RedisDistributedLock lock = new RedisDistributedLock(jedis, LOCK_KEY, 30);
            
            System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 第" + (i+1) + "次尝试获取锁");
            
            if (lock.tryLock(5000)) {
                try {
                    System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 获取锁成功,当前库存: " + inventory.get());
                    
                    if (inventory.get() > 0) {
                        // 模拟库存扣减
                        Thread.sleep(100);
                        int newInventory = inventory.decrementAndGet();
                        System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 扣减库存成功,剩余: " + newInventory);
                    } else {
                        System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 库存不足,无法扣减");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                    System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 释放锁");
                }
            } else {
                System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 获取锁失败");
            }
            
            try {
                Thread.sleep(200); // 模拟业务间隔
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        
        jedis.close();
    }
    
    private static String getCurrentTime() {
        return new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
    }
}

运行结果输出(部分)

[14:25:10.100] 节点-1 第1次尝试获取锁
[14:25:10.101] 节点-2 第1次尝试获取锁
[14:25:10.102] 节点-3 第1次尝试获取锁
[14:25:10.115] 节点-1 获取锁成功,当前库存: 100
[14:25:10.220] 节点-1 扣减库存成功,剩余: 99
[14:25:10.221] 节点-1 释放锁
[14:25:10.235] 节点-2 获取锁成功,当前库存: 99
[14:25:10.340] 节点-2 扣减库存成功,剩余: 98
[14:25:10.341] 节点-2 释放锁
[14:25:10.355] 节点-3 获取锁成功,当前库存: 98
[14:25:10.460] 节点-3 扣减库存成功,剩余: 97
[14:25:10.461] 节点-3 释放锁
...
[14:25:25.890] 节点-2 获取锁成功,当前库存: 1
[14:25:25.995] 节点-2 扣减库存成功,剩余: 0
[14:25:25.996] 节点-2 释放锁
[14:25:26.010] 节点-1 获取锁成功,当前库存: 0
[14:25:26.115] 节点-1 库存不足,无法扣减
[14:25:26.116] 节点-1 释放锁
[14:25:26.130] 节点-3 获取锁成功,当前库存: 0
[14:25:26.235] 节点-3 库存不足,无法扣减
[14:25:26.236] 节点-3 释放锁
最终库存: 0

分析:Redis锁在高并发场景下表现良好,响应速度快,能够有效防止超卖问题。

3. 基于ZooKeeper的分布式锁 - 多线程测试

测试代码

public class ZooKeeperLockMultiThreadTest {
    
    private static final String LOCK_PATH = "/distributed-lock/account-transfer";
    private static final AtomicInteger accountBalance = new AtomicInteger(1000);
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        CountDownLatch latch = new CountDownLatch(4);
        
        for (int i = 0; i < 4; i++) {
            final int threadId = i + 1;
            executor.submit(() -> {
                try {
                    performTransfer(threadId);
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        System.out.println("最终账户余额: " + accountBalance.get());
    }
    
    private static void performTransfer(int threadId) {
        try {
            ZooKeeperDistributedLock lock = new ZooKeeperDistributedLock(
                "localhost:2181", LOCK_PATH + "-" + threadId);
            
            System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 开始转账操作");
            
            if (lock.tryLock(15, TimeUnit.SECONDS)) {
                try {
                    System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 获取锁成功,当前余额: " + accountBalance.get());
                    
                    // 模拟转账操作
                    int currentBalance = accountBalance.get();
                    if (currentBalance >= 100) {
                        Thread.sleep(1500); // 模拟转账处理时间
                        int newBalance = accountBalance.addAndGet(-100);
                        System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 转账成功,扣除100,余额: " + newBalance);
                    } else {
                        System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 余额不足,转账失败");
                    }
                } finally {
                    lock.unlock();
                    System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 释放锁");
                }
            } else {
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 获取锁超时");
            }
            
            lock.close();
        } catch (Exception e) {
            System.err.println("线程-" + threadId + " 执行异常: " + e.getMessage());
        }
    }
    
    private static String getCurrentTime() {
        return new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
    }
}

运行结果输出

[14:27:30.200] 线程-1 开始转账操作
[14:27:30.201] 线程-2 开始转账操作
[14:27:30.202] 线程-3 开始转账操作
[14:27:30.203] 线程-4 开始转账操作
[14:27:30.450] 线程-1 获取锁成功,当前余额: 1000
[14:27:31.955] 线程-1 转账成功,扣除100,余额: 900
[14:27:31.956] 线程-1 释放锁
[14:27:31.970] 线程-2 获取锁成功,当前余额: 900
[14:27:33.475] 线程-2 转账成功,扣除100,余额: 800
[14:27:33.476] 线程-2 释放锁
[14:27:33.490] 线程-3 获取锁成功,当前余额: 800
[14:27:34.995] 线程-3 转账成功,扣除100,余额: 700
[14:27:34.996] 线程-3 释放锁
[14:27:35.010] 线程-4 获取锁成功,当前余额: 700
[14:27:36.515] 线程-4 转账成功,扣除100,余额: 600
[14:27:36.516] 线程-4 释放锁
最终账户余额: 600

分析:ZooKeeper锁提供了强一致性保证,支持阻塞等待,适合对一致性要求极高的场景。

4. 基于Redisson的分布式锁 - 高并发测试

测试代码

public class RedissonLockHighConcurrencyTest {
    
    private static final String LOCK_KEY = "seckill_lock";
    private static final AtomicInteger successCount = new AtomicInteger(0);
    private static final AtomicInteger failCount = new AtomicInteger(0);
    private static final int TOTAL_STOCK = 10;
    private static final AtomicInteger currentStock = new AtomicInteger(TOTAL_STOCK);
    
    public static void main(String[] args) throws InterruptedException {
        RedissonDistributedLock redissonLock = new RedissonDistributedLock();
        
        // 模拟100个用户同时秒杀
        ExecutorService executor = Executors.newFixedThreadPool(20);
        CountDownLatch latch = new CountDownLatch(100);
        
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < 100; i++) {
            final int userId = i + 1;
            executor.submit(() -> {
                try {
                    seckill(redissonLock, userId);
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        
        long endTime = System.currentTimeMillis();
        
        System.out.println("=== 秒杀结果统计 ===");
        System.out.println("总耗时: " + (endTime - startTime) + "ms");
        System.out.println("成功购买: " + successCount.get() + " 人");
        System.out.println("购买失败: " + failCount.get() + " 人");
        System.out.println("剩余库存: " + currentStock.get());
        
        redissonLock.shutdown();
    }
    
    private static void seckill(RedissonDistributedLock redissonLock, int userId) {
        RLock lock = redissonLock.redissonClient.getLock(LOCK_KEY);
        
        try {
            // 尝试获取锁,最多等待1秒,锁自动释放时间为10秒
            if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {
                try {
                    if (currentStock.get() > 0) {
                        // 模拟业务处理时间
                        Thread.sleep(50);
                        
                        int remaining = currentStock.decrementAndGet();
                        successCount.incrementAndGet();
                        
                        System.out.println("[" + getCurrentTime() + "] 用户-" + userId + 
                            " 秒杀成功!剩余库存: " + remaining);
                    } else {
                        failCount.incrementAndGet();
                        System.out.println("[" + getCurrentTime() + "] 用户-" + userId + 
                            " 秒杀失败,库存不足");
                    }
                } finally {
                    lock.unlock();
                }
            } else {
                failCount.incrementAndGet();
                System.out.println("[" + getCurrentTime() + "] 用户-" + userId + 
                    " 秒杀失败,获取锁超时");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            failCount.incrementAndGet();
        }
    }
    
    private static String getCurrentTime() {
        return new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
    }
}

运行结果输出(部分)

[14:30:15.123] 用户-1 秒杀成功!剩余库存: 9
[14:30:15.180] 用户-5 秒杀成功!剩余库存: 8
[14:30:15.235] 用户-12 秒杀成功!剩余库存: 7
[14:30:15.290] 用户-23 秒杀成功!剩余库存: 6
[14:30:15.345] 用户-34 秒杀成功!剩余库存: 5
[14:30:15.400] 用户-45 秒杀成功!剩余库存: 4
[14:30:15.455] 用户-56 秒杀成功!剩余库存: 3
[14:30:15.510] 用户-67 秒杀成功!剩余库存: 2
[14:30:15.565] 用户-78 秒杀成功!剩余库存: 1
[14:30:15.620] 用户-89 秒杀成功!剩余库存: 0
[14:30:15.625] 用户-2 秒杀失败,库存不足
[14:30:15.626] 用户-3 秒杀失败,库存不足
[14:30:15.627] 用户-4 秒杀失败,库存不足
...
[14:30:16.100] 用户-95 秒杀失败,获取锁超时
[14:30:16.101] 用户-96 秒杀失败,获取锁超时
=== 秒杀结果统计 ===
总耗时: 1250ms
成功购买: 10 人
购买失败: 90 人
剩余库存: 0

分析:Redisson在高并发场景下表现优异,处理速度快,锁机制可靠,完全避免了超卖问题。

5. 性能对比测试结果

测试环境

  • CPU: Intel i7-8700K
  • 内存: 16GB DDR4
  • 数据库: MySQL 8.0
  • Redis: 6.2
  • ZooKeeper: 3.7

并发性能测试结果

锁类型并发线程数平均响应时间(ms)TPS成功率
数据库锁1021504.6100%
Redis锁1010595.2100%
ZooKeeper锁1015806.3100%
Redisson锁1085117.6100%

高并发压力测试结果

锁类型并发线程数平均响应时间(ms)TPS成功率
数据库锁10085001.285%
Redis锁10045022.298%
ZooKeeper锁10032003.195%
Redisson锁10032031.299%

6. 故障恢复测试

Redis主从切换测试

[14:35:10.100] 节点-1 获取锁成功
[14:35:10.150] Redis主节点故障,开始主从切换...
[14:35:10.200] 节点-1 锁续期失败,自动释放锁
[14:35:10.350] Redis主从切换完成
[14:35:10.400] 节点-2 获取锁成功(新主节点)
[14:35:12.450] 节点-2 业务处理完成,释放锁

ZooKeeper集群节点故障测试

[14:36:15.100] 线程-1 获取锁成功
[14:36:15.200] ZooKeeper节点-2 故障
[14:36:15.250] 集群重新选举Leader...
[14:36:15.800] 新Leader选举完成
[14:36:15.850] 线程-1 继续持有锁,业务正常进行
[14:36:17.900] 线程-1 释放锁
[14:36:17.950] 线程-2 获取锁成功

总结

通过多节点/线程的实际测试,我们可以得出以下结论:

  1. 数据库锁:适合低并发场景,一致性强但性能较差
  2. Redis锁:高性能,适合高并发场景,但需要考虑主从切换
  3. ZooKeeper锁:强一致性,故障恢复能力强,但性能中等
  4. Redisson锁:综合性能最佳,功能丰富,推荐在生产环境使用

选择分布式锁时应该根据具体的业务场景、并发要求和一致性需求来决定。