目录

C项目仿muduo库高并发服务器-时间轮定时器

C++项目:仿muduo库高并发服务器——-时间轮定时器



前言

本篇文章介绍的实现TimerQueue模块需要用到的语法知识、结构设计、以及具体代码实现,学习时请结合具体项目学习。

回顾:
TimerQueue模块是实现固定时间定时任务的模块,可以理解就是要给定时任务管理器,向定时任务管理器中添加⼀个任务,任务将在固定时间后被执行,同时也可以通过刷新定时任务来延迟任务的执行。

  • 功能:定时任务模块,让任务能在指定时间后执行
  • 意义:组件内部用于释放非活跃连接(希望非活跃连接在N秒后被释放 )
  • 功能设计:
    1. 添加定时任务
    2. 刷新定时任务(使定时任务重新开始计时 )
    3. 取消定时任务

这个模块主要是对Connection对象的⽣命周期管理,对非活跃连接进行超时后的释放功能。


目前我们先实现一个功能模块,后续会结合其他模块,根据需要进行调整

一、创建定时器

因项目需要,这里只简单学习,帮助我们理解时间轮实现

1.1 timerfd_create 函数

Linux 系统下的系统调用函数 timerfd_create,用于创建一个定时器对象,返回一个文件描述符。 定时器到期时,系统自动向其文件描述符写入 8 字节数据(uint64_t 类型表示 “距离上一次读取到现在超时次数”)。具体操作方式和文件操作一样。如:

定时周期: 定时器设为 3 秒超时(后面介绍如何设置 ),系统会每隔 3 秒向定时器文件描述符 “写入计数”。
计数含义: read 时拿到的 8 字节数据(uint64_t 类型),是从上次读之后到当前的 “超时次数”。比如 30 秒没读,超时次数 = 30/3 = 10 次,read 就会拿到 10 。

 #include <sys/timerfd.h>
 int timerfd_create(int clockid, int flags);

参数

  • clockid:指定定时器使用的时钟类型,常见取值如:
    1. CLOCK_REALTIME:系统实时时钟(受系统时间修改影响 )。
    2. CLOCK_MONOTONIC:单调递增时钟(不受系统时间调整影响,适合测量间隔 )。
  • flags:标志位,可组合使用(如 TFD_NONBLOCK 设为非阻塞),传 0 表示默认阻塞行为。

阻塞
timerfd_create 创建的文件描述符,默认是阻塞模式
比如用 read(tfd, ...) 读定时器到期事件时:

  • 若定时器没到期,read卡住(阻塞),线程/进程暂停执行,直到定时器到期、有数据可读才返回。

这种阻塞方式是比较适合这里的需要的,所以使用默认设置即可,时钟类型选择 CLOCK_MONOTONIC

1.2 timerfd_settime

int timerfd_settime(int fd, int flags,
                    const struct itimerspec *new_value,
                    struct itimerspec *old_value);

用于配置 timerfd_create 创建的定时器,设置首次超时时间、周期超时间隔。

参数

  • fdtimerfd_create 返回的定时器文件描述符。
  • flags:控制行为(如 0 用相对时间)。

相对时间:是以当前时间为起始点来计算定时器的超时时间,如:在 12 点 0 分 0 秒调用 timerfd_settime ,那么定时器会已该时间作为基准判断是否超时。

  • newitimerspec 结构体指针,配置 首次超时(it_value周期间隔(it_interval,精度到纳秒(tv_sec 秒 + tv_nsec 纳秒 )。

首次超时指启动定时器后,经过设定时长即判定为超时;周期间隔指首次超时发生后,每隔设定时长就判定一次超时 。 比如 12 点 0 分 0 秒调用函数,若首次超时设为 3 秒、周期间隔设为 2 秒:定时器会在 12 点 0 分 3 秒触发第一次超时,此后每隔 2 秒(12 点 0 分 5 秒、12 点 0 分 7 秒…… )触发一次超时 。

struct timespec {
    time_t tv_sec;    /* 秒数 */
    long   tv_nsec;   /* 纳秒数 */
};

struct itimerspec {
    struct timespec it_interval;  /* 周期性定时器的时间间隔 */
    struct timespec it_value;     /* 初始超时时间 */
};
  • old:若非 NULL,传出之前的定时器配置。
#include <iostream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/timerfd.h>
#include <sys/select.h>

int main()
{
    // 创建一个定时器
    int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
    
    struct itimerspec itm;
    itm.it_value.tv_sec = 3;        // 设置第一次超时的时间(秒)
    itm.it_value.tv_nsec = 0;       // 第一次超时的纳秒部分
    itm.it_interval.tv_sec = 3;     // 第一次超时后,每隔多长时间超时(秒)
    itm.it_interval.tv_nsec = 0;    // 周期间隔的纳秒部分
    
    // 启动定时器
    timerfd_settime(timerfd, 0, &itm, NULL);
    
    // 这个定时器描述符将每隔三秒都会触发一次可读事件
    time_t start = time(NULL);
    
    while (1) {
        uint64_t tmp;
        
        // 注意:定时器超时后,描述符触发可读事件,必须读取8字节数据
        // 数据保存的是自上次启动定时器或read后的超时次数
        int ret = read(timerfd, &tmp, sizeof(tmp));
        if (ret < 0) {
            return -1;
        }
        
        std::cout << tmp << " " << time(NULL) - start << std::endl;
    }
    
    close(timerfd);
    return 0;
}

二、管理定时任务

结合代码理解
在 TimerQueue 模块中,必然存在大量的超时任务。目前我们仅知道如何为任务创建定时器的方法,但关键问题在于:如何高效判断任务是否超时?若逐个读取定时器来检查,会产生较大的开销,显然采用定时器的方法是不合理的。因此,我们采用时间轮机制,以此实现对任务的高效超时管理。
https://i-blog.csdnimg.cn/direct/a3bb11b58ab3409faa8020f1f1064b88.png

这样只需要秒级时间轮60,分级60,时级24,就可以表示几乎所有的情况,如果不够可以再加年…

目前同一时刻的定时任务只能添加一个,但是我们需支持同一时刻添加多个定时任务的时间轮。
解决方案:将时间轮的一维数组设计为二维数组,让时间轮一维数组的每个节点本身也是一个数组

到了现在我们解决了对超时任务的执行,但是仍面临着一个问题,对“非活跃连接(假设30s 无通信则销毁)”的定时任务,若连接在 30s 内有数据通信,原定时销毁逻辑需延迟执行(因此时连接仍活跃,不该销毁 ),该如何来对链接任务进行延迟销毁呢?。

通过 类的析构函数 + shared_ptr 智能指针 实现定时任务的延时控制。利用智能指针的引用计数和析构时机,间接让定时任务“延迟生效”。

  • 把“任务执行逻辑”放到类的析构函数 → 类对象销毁时自动触发任务。
  • shared_ptr 管理该类对象,将 shared_ptr 放入时间轮等待调度。
  • 若等待期间有 IO 事件 → 新建 shared_ptr 替换时间轮里的旧对象 → 旧对象因引用计数未清零延迟销毁 → 任务执行被延后。

时间轮每个槽位存 shared_ptr

  • 正常流程:到 tick 时销毁 shared_ptr → 触发析构执行任务。
  • 有 IO 时:提前用新 shared_ptr 替换旧对象 → 旧对象暂时不销毁(引用计数 >0 )→ 任务延迟到新对象的 tick 执行。

https://i-blog.csdnimg.cn/direct/62bb3d7f03974e7d94ba46acc689dd3f.png

三、代码实现

#include<iostream>
#include<functional>
#include<vector>
#include<memory>
#include<unordered_map>
#include<unistd.h>
using TaskFunc=std::function<void()>;//对类型起别名
using ReleaseFunc=std::function<void()>;
//定时任务对象
class TimerTask{
public:
    TimerTask(uint64_t id,uint32_t timeout,const TaskFunc&cb)
    :_id(id)
    ,_timeout(timeout)
    ,_task_cb(cb)
    ,_cancel(false)
    {}
    ~TimerTask()
    {
        if(!_cancel)_task_cb(); //如果任务有效就对象销毁执行任务
        _release();
    }
    void SetRelsease(const ReleaseFunc &cb){_release=cb;}
    uint64_t TimerOut()
    {
        return _timeout;
    }
    void Cancel()  //取消定时任务
    {
        _cancel=true;
    }

private:
    uint64_t _id;        //标识唯一的定时任务对象
    uint32_t _timeout;   //定时任务的超时时间
    TaskFunc _task_cb;   //要执行的定时任务
    bool _cancel;        //是否取消任务,false 否,true 是
    ReleaseFunc _release;//任务执行后删除TimerWheel中保存的定时任务对象信息

};

//时间轮
class TimerWheel{
public:
    TimerWheel():_tick(0),_capacity(60),_wheel(_capacity)
    {}

    //向轮表中添加定时器任务
    void TimerAdd(uint64_t id,uint32_t timeout,const TaskFunc&cb)
    {
        PtrTask pt(new TimerTask(id,timeout,cb));

        pt->SetRelsease(std::bind(&TimerWheel::RemoveTimer,this,id));
        int pos=(_tick+timeout)%_capacity;
        _wheel[pos].push_back(pt);
        _times[id]=WeakTask(pt);
    }

    //刷新/延时定时任务
    void TimerRefresh(uint64_t id)
    {
        auto it=_times.find(id);
        if(it==_times.end())
        {
            return;//不存在定时任务,不应该刷新
        }
        PtrTask pt=it->second.lock();//获取weak_ptr中管理的shared_ptr

        int pos=(_tick+pt->TimerOut())%_capacity;
        _wheel[pos].push_back(pt);//延时定时任务

    }
    //执行超时任务
    void RumTimerTask()
    {

        int pos=(_tick+1)%_capacity;
        _tick=pos;
        _wheel[pos].clear();//清空该位置的数组,就会将数组中保存的shared_ptr的对象全部销毁
    }
    void TimerCancel(uint64_t id)
    {
        auto it=_times.find(id);
        if(it==_times.end())
        {
            return;//不存在定时任务
        }
        PtrTask pt=it->second.lock();//获取weak_ptr中管理的shared_ptr
        pt->Cancel();//取消任务
    }
private:
    //定时任务执行后将任务从wheel中移除
    void RemoveTimer(uint64_t id)
    {
        auto it=_times.find(id);
        if(it!=_times.end())
        {
            _times.erase(it);
        }
    }

private:
    using PtrTask=std::shared_ptr<TimerTask>;
    using WeakTask=std::weak_ptr<TimerTask>;
    size_t _tick;       //滴答指针,走到哪里执行哪里的任务
    size_t _capacity;   //轮表最大容量,也表示当前时间轮最大延时秒数
    std::vector<std::vector<PtrTask>>_wheel;
    //使用weak_ptr对定时器任务保存帮助我们对定时器任务进行刷新设置
    std::unordered_map<uint64_t,WeakTask> _times;
};

//测试部分
// struct Test{
//     Test(){std::cout<<"构造"<<std::endl;}
//     ~Test(){std::cout<<"析构"<<std::endl;}
// };
// void DeleteTask(Test*ptr)
// {
//     delete ptr;
// }
// int main()
// {
//     Test*ptr=new Test();
//     TimerWheel wheel;
//     wheel.TimerAdd(888,5,std::bind(DeleteTask,ptr));

//     for(int i=0;i<5;i++)
//     {
//         wheel.RumTimerTask();//滴答指针运行
//         wheel.TimerRefresh(888);//刷新延时任务
//         std::cout<<"延时任务刷新,在五秒后执行"<<std::endl;
//         sleep(1);
//     }
//     wheel.TimerCancel(888);
//     while(1)
//     {
//         wheel.RumTimerTask();
//         std::cout<<"*********"<<std::endl;
//         sleep(1);
//     }
//     return 0;
// }

后续使用时,只需要根据项目需要,对该模块略微调整即可