目录

Linux笔记-Reactor模式与EpollReactor

Linux笔记—Reactor模式与EpollReactor

1. Reactor模式介绍

Reactor 模式是一种事件驱动(Event-Driven)的设计模式,主要用于高效处理并发 I/O 操作,尤其在网络编程中被广泛应用。其核心思想是通过一个或多个 “反应器”(Reactor)监听事件(如连接请求、数据到达、错误等),当事件发生时,自动将事件分发到对应的处理器(Handler)进行处理,从而避免传统阻塞 I/O 的效率问题。

1.1 Reactor 模式的核心组件

  • Reactor(反应器):模式的核心协调者,负责管理事件的注册、注销,并在事件发生时调度对应的处理器。它是事件循环的 “大脑”。
  • Event Demultiplexer(事件多路分发器):通常基于操作系统的 I/O 多路复用机制(如 select、poll、epoll、kqueue 等),用于阻塞等待多个 I/O 事件的发生,并将就绪的事件通知给 Reactor。
  • Handler(处理器):定义事件处理逻辑的组件,负责实际处理事件(如读取数据、发送响应、处理连接等)。每个事件类型通常对应一个 Handler。
  • Acceptor(接受器,可选):一种特殊的 Handler,专门处理新的连接请求(如 TCP 的三次握手),并在连接建立后为新连接注册对应的 I/O 事件和 Handler。

1.2 Reactor 模式的典型工作流程

  1. 初始化:创建 Reactor 和 Event Demultiplexer,注册初始事件(如服务器监听端口的 “连接事件”)及对应的 Handler(如 Acceptor)。
  2. 事件循环:Reactor 启动循环,通过 Event Demultiplexer 阻塞等待事件发生。
  3. 事件就绪:当有事件(如客户端连接、数据到达)发生时,Event Demultiplexer 将就绪事件列表返回给 Reactor。
  4. 事件分发:Reactor 根据事件类型,调用对应的 Handler 处理(如 Acceptor 处理连接请求,I/O Handler 处理数据读写)。
  5. 处理完成:Handler 处理完毕后,Reactor 继续循环等待下一批事件。

1.3 优缺点与适用场景

  • 优点
    • 高效处理并发 I/O,避免传统多线程 / 多进程的资源开销(如线程切换、内存占用)。
    • 事件驱动模型清晰,便于扩展和维护。
    • 适合 I/O 密集型场景(如网络服务器、消息中间件)。
  • 缺点
    • 事件处理逻辑若阻塞会影响整个系统(需配合线程池解决)。
    • 设计较复杂,需要正确处理事件注册、分发和异常。
  • 典型应用
    • 网络框架:Netty、Mina、Libevent。
    • 服务器:Nginx(处理网络事件)、Redis(处理客户端连接)。
    • 消息中间件:Kafka(网络 I/O 处理)。

2. EpollReactor网络服务

在Linux环境下,最适合用来实践Reactor模式的底层技术自然就是epoll。接下来,我们就逐步来实现一个基于epoll的Reactor网络服务。

2.1 封装epoll

既然是基于epoll来实现Reactor,那么我们首先将epoll进行一下封装。

在保留epoll原有接口的基础之上,我们可以将Epoll进行操作的基本单位(fd)封装为处理器。

将fd进行封装有一个重要的原因就是:在使用ET触发模式时,每个fd都需要自己的输入输出缓冲区来临时存储就绪的数据。

而我们要做的是一个Reactor网络服务,自然将其封装为处理器是最好的选择。

当然,处理器的种类是很多的,代表着不同类型的fd与不同的处理方式。但是,我们可以确定,对事件的处理有三类:读、写、错误(异常)。

据此,我们可以定义出如下的基类:

#pragma once
#include "Socket.hpp"
#include <memory>
using namespace SocketModule;

class Epoll;

// 事件处理器接口:定义事件处理方法
class EventHandler
{
public:
    // 处理读事件
    virtual void HandleRecv() {}
    // 处理写事件
    virtual void HandleSend() {}
    // 处理错误事件
    virtual void HandleExce() {}
    virtual int Fd() = 0;
    void SetMonitor(Epoll *monitor)
    {
        _monitor = monitor;
    }
    uint32_t Events()
    {
        return _events;
    }
    void SetEvents(uint32_t events)
    {
        _events = events;
    }

protected:
    EventHandler(Epoll *monitor = nullptr)
        : _monitor(monitor)
    {}
    ~EventHandler() {}
    std::string _recv_buffer; // 接收缓冲区
    std::string _send_buffer; // 发送缓冲区
    Epoll *_monitor;          // 监视该事件的Epoll实体
                              // 在处理就绪事件时可能需要调用Epoll实体的某些方法
    uint32_t _events;         // 关心的事件
};

用户可以根据自己的需求,在这个在这个基类的基础之上,定义出各种事件处理器,例如我们需要用到的TCP套接字的事件处理器:

#pragma once
#include "EventHandler.hpp"
#include "Epoll.hpp"
#include "Common.hpp"
#include <functional>

class ConnectSocketEH;
using service_t = std::function<std::string(std::string&)>;

// 连接套接字事件处理器
class ConnectSocketEH : public EventHandler
{
public:
    ConnectSocketEH(std::shared_ptr<TCPConnectSocket> connect_socket, service_t service, Epoll *monitor = nullptr)
        : EventHandler(monitor)
        , _connect_socket(std::move(connect_socket))
        , _service(service)
    {
        SetNonBlock(_connect_socket->SockFd());
        SetEvents(EPOLLET | EPOLLIN);
    }
    void HandleRecv() override
    {
        int n = 0;
        std::string tmp;
        while(true)
        {
            if((n = _connect_socket->Receive(tmp)) > 0)
            {
                LOG(LogLevel::INFO) << "收到新的消息: " << tmp;
                _recv_buffer += tmp;
            }
            else if(n == 0)
            {
                LOG(LogLevel::INFO) << "[" << _connect_socket->Addr().Info() << "]连接已断开! ";
                _monitor->DelEventHandler(_monitor->GetHandler(Fd()));
                break;
            }
            else if(errno == EAGAIN || errno == EWOULDBLOCK)
                break;
            else if(errno == EINTR)
                continue;
            else
            {
                LOG(LogLevel::ERROR) << "ConnectSocketEH::HandleRecv: 获取数据出错! ";
                HandleExce();
                break;
            }
        }

        if(!_recv_buffer.empty())
            _send_buffer = _service(_recv_buffer); // 调用上层服务处理数据
        
        if(!_send_buffer.empty())
        {
            HandleSend(); // 最佳实践
            // 或者开启写事件关心
            // _monitor->ModEventHandler(_monitor->GetHandler(Fd()), Events() | EPOLLOUT);
        }
    }
    void HandleSend() override
    {
        while(true)
        {
            int n = _connect_socket->Send(_send_buffer);
            if(n > 0)
            {
                // 未发完,删掉已发部分继续发
                _send_buffer.erase(0, n);
            }
            else if(n == 0)
            {
                // 发完
                break;
            }
            else
            {
                if(errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if(errno == EINTR)
                    continue;
                else
                {
                    LOG(LogLevel::ERROR) << "ConnectSocketEH::HandleSend: 发送失败! ";
                    HandleExce();
                    break;
                }
            }
        }

        if(!_send_buffer.empty())
        {
            // 写条件不满足, 开启对写事件的关心
            SetEvents(Events() | EPOLLOUT);
            _monitor->ModEventHandler(_monitor->GetHandler(Fd()), Events());
        }
        else
        {
            // 数据已写完, 关闭对写事件的关心
            SetEvents(Events() & ~EPOLLOUT);
            _monitor->ModEventHandler(_monitor->GetHandler(Fd()), Events());
        }
    }
    void HandleExce() override
    {
        _monitor->DelEventHandler(_monitor->GetHandler(Fd()));
    }
    int Fd() override
    {
        return _connect_socket->SockFd();
    }
    ~ConnectSocketEH() {}

private:
    std::shared_ptr<TCPConnectSocket> _connect_socket;
    service_t _service;
};

// 监听套接字事件处理器
class ListenSocketEH : public EventHandler
{
public:
    ListenSocketEH(in_port_t port, service_t service, Epoll *monitor = nullptr)
        : EventHandler(monitor)
        , _service(service)
    {
        _listen_socket = std::make_shared<TCPListenSocket>(port);
        SetNonBlock(_listen_socket->SockFd());
        SetEvents(EPOLLET | EPOLLIN);
    }
    void HandleRecv() override
    {
        std::shared_ptr<TCPConnectSocket> connect_socket = nullptr;
        while (true)
        {
            if(connect_socket = _listen_socket->Accept())
            {
                std::shared_ptr<ConnectSocketEH> cs_eh = std::make_shared<ConnectSocketEH>(connect_socket, _service, _monitor);
                _monitor->AddEventHandler(cs_eh, EPOLLIN | EPOLLET);
            }
            else if(errno == EAGAIN || errno == EWOULDBLOCK)
                break;
            else if(errno == EINTR)
                continue;
            else
            {
                LOG(LogLevel::ERROR) << "ListenSocketEH::HandleRecv: 建立新连接出错! ";
                break;
            }
        }
    }
    int Fd() override
    {
        return _listen_socket->SockFd();
    }
    ~ListenSocketEH() {}

private:
    std::shared_ptr<TCPListenSocket> _listen_socket;
    service_t _service;
};

使用EventHandler类作为操作的基本单位,在尽可能保留epoll原本使用流程的前提下,我们可以封装出如下的Epoll类:

#pragma once
#include "EventHandler.hpp"
#include <unordered_map>
#include <sys/epoll.h>
#include "Common.hpp"

class Epoll
{
public:
    Epoll()
    {
        _epfd = epoll_create1(0);
        if(_epfd == -1)
        {
            LOG(LogLevel::FATAL) << "EpollReactor: epoll实例创建失败! ";
            exit(EPCREATE_ERROR);
        }
    }
    // 添加事件处理器
    bool AddEventHandler(const std::shared_ptr<EventHandler>& event_handler, uint32_t events)
    {
        int fd = event_handler->Fd();
        if(_event_handlers.count(fd))
        {
            LOG(LogLevel::WARNING) << "EpollReactor::AddEventHandler: fd[" << fd << "]已存在! ";
            return false;
        }
        epoll_event epev;
        epev.data.fd = fd;
        epev.events = events;
        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &epev);
        if(n == -1)
        {
            LOG(LogLevel::ERROR) << "EpollReactor::AddEventHandler: fd[" << fd << "]添加失败! " << strerror(errno);
            return false;
        }
        _event_handlers[fd] = event_handler;
        return true;
    }
    // 删除事件处理器
    bool DelEventHandler(const std::shared_ptr<EventHandler>& event_handler)
    {
        int fd = event_handler->Fd();
        if(!_event_handlers.count(fd))
        {
            LOG(LogLevel::WARNING) << "EpollReactor::DelEventHandler: fd[" << fd << "]不存在! ";
            return false;
        }
        int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
        if(n == -1)
        {
            LOG(LogLevel::ERROR) << "EpollReactor::DelEventHandler: fd[" << fd << "]删除失败! " << strerror(errno);
            return false;
        }
        _event_handlers.erase(fd);
        return true;
    }
    // 修改事件处理器
    bool ModEventHandler(const std::shared_ptr<EventHandler>& event_handler, uint32_t events)
    {
        int fd = event_handler->Fd();
        if(!_event_handlers.count(fd))
        {
            LOG(LogLevel::WARNING) << "EpollReactor::ModEventHandler: fd[" << fd << "]不存在! ";
            return false;
        }
        epoll_event epev;
        epev.data.fd = fd;
        epev.events = events;
        int n = epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &epev);
        if(n == -1)
        {
            LOG(LogLevel::ERROR) << "EpollReactor::ModEventHandler: fd[" << fd << "]修改失败! " << strerror(errno);
            return false;
        }
        return true;
    }
    // 等待事件触发
    int Wait(std::vector<epoll_event>& ready, int timeout)
    {
        int n = epoll_wait(_epfd, ready.data(), ready.size(), timeout);
        if(n == 0)
        {
            LOG(LogLevel::WARNING) << "EpollReactor::Wait: 超时... ";
        }
        else if(n == -1)
        {
            LOG(LogLevel::ERROR) << "EpollReactor::Wait: 出错! ";
        }
        return n;
    }
    // 获取对应文件描述符的事件处理器
    std::shared_ptr<EventHandler> GetHandler(int fd)
    {
        if(!_event_handlers.count(fd))
            return nullptr;
        return _event_handlers[fd];
    }
    size_t Size()
    {
        return _event_handlers.size();
    }
    ~Epoll() {}
private:
    int _epfd;
    std::unordered_map<int, std::shared_ptr<EventHandler>> _event_handlers;
};

2.2 封装EpollReactor

现在,我们有了处理器、接受器(TCPSocketEH),现在还缺少反应器与事件多路分发器。

但是,实际上我们的Epoll类已经完成了反应器和多路分发器的大部分工作,接下来只需要按照Reactor的模式的经典工作流程来使用Epoll类即可。

#pragma once
#include "Epoll.hpp"
#include "SocketEH.hpp"

class EpollReactor
{
private:
    static const int default_one_loop_num = 256;

    // 多路分发器
    void Dispatch(int n)
    {
        for(int i = 0; i < n; i++)
        {
            std::shared_ptr<EventHandler> eh_ptr = _epoll.GetHandler(_ready_events[i].data.fd);
            uint32_t events = _ready_events[i].events;
            if(events & EPOLLERR || events & EPOLLHUP)
                events = (EPOLLIN | EPOLLOUT);

            if(events & EPOLLIN)
                eh_ptr->HandleRecv();
            if(events & EPOLLOUT)
                eh_ptr->HandleSend();
        }
    }
public:
    EpollReactor(int one_loop_num = default_one_loop_num)
        : _isrunning(false)
        , _ready_events(one_loop_num)
    {}
    void Run(int timeout)
    {
        _isrunning = true;
        while(_isrunning)
        {
            int n = _epoll.Wait(_ready_events, timeout);
            if(n > 0)
            {
                LOG(LogLevel::INFO) << "有事件就绪...";
                Dispatch(n);
            }
        }
        _isrunning = false;
    }

    // 向_epoll中注册事件处理器
    bool RegisterEH(const std::shared_ptr<EventHandler>& event_handler, uint32_t events)
    {
        // 事件处理器在处理就绪事件时,可能会调用到Epoll类的方法
        event_handler->SetMonitor(&_epoll);
        return _epoll.AddEventHandler(event_handler, events);
    }
    ~EpollReactor() {}
private:
    Epoll _epoll;
    bool _isrunning;
    std::vector<epoll_event> _ready_events;
};

2.3 添加上层服务

现在我们的Reactor部分就完工了,上层的服务如何使用这个Reactor呢?

很简单,我们在ListenSockeEH和ConnectSocketEH当中都定义了一个字段—service。

上层的服务只需要将自己处理数据的服务注册到ListenSocketEH当中,再将这个ListenSocketEH实例注册到EpollReactor当中即可。

  • 每当有连接到来时(监听套接字的读事件),ListenSocketEH处理读事件的方法就会将这个服务注册到新创建的ConnectSocketEH当中;
  • 而每当有数据到来时(连接套接字的读事件),ConnectSocketEH就会尝试调用该服务来处理数据。

我们之前使用多进程/多线程的方式做过一个网络计算器:客户端将数据发送给服务器,服务器计算之后返回结果(功能很鸡肋,仅作练习用)。

我们可以改造一下这个计算器的协议部分,然后将其作为上层服务搬到这边来:

// 协议核心回调函数修改前
void ServerService(const std::shared_ptr<TCPConnectSocket>& socket)
{
    std::string package, buffer, json_str, send_msg;
    while(socket->Receive(buffer))
    {
        package += buffer;
        if(decapsulate(package, json_str))
        {
            Request request(json_str);
            // 使用顶层提供的回调函数来处理请求
            Response response = _handler(request);
            encapsulate(response.JsonStr(), send_msg);
            socket->Send(send_msg);
        }
    }
    LOG(LogLevel::INFO) << "[" << socket->addr().Info() << "]连接已断开! ";
}

// 协议核心回调函数修改后
std::string ServerService(std::string &package)
{
    std::string buffer, json_str, send_msg;
    if (decapsulate(package, json_str))
    {
        Request request(json_str);
        // 使用顶层提供的回调函数来处理请求
        Response response = _handler(request);
        encapsulate(response.JsonStr(), send_msg);
    }
    return send_msg;
}

接着,在主函数中一顿套娃:

#include "EpollReactor.hpp"
#include "Common.hpp"
#include "Calculator.hpp"
#include "Protocol.hpp"

int main(int argc, char *argv[])
{
    if(argc != 2)
    {
        LOG(LogLevel::FATAL) << "Usage: " << argv[0] << " + port";
        return USAGE_ERROR;
    }
    
    in_port_t port = std::stoi(argv[1]);
    
    // 顶层服务: 计算器
    Calculator calculator;

    // 协议层
    Protocol protocol([&calculator](const Request& request)->Response{
        return calculator.Calculate(request);
    });

    // 协议层回调函数注册到listen套接字事件处理器listen_eh
    // 该回调函数会被listen_eh注册到每个连接套接字的处理器
    std::shared_ptr<ListenSocketEH> listen_eh = std::make_shared<ListenSocketEH>(port, [&protocol](std::string& buffer){
        return protocol.ServerService(buffer);
    });

    // 创建reactor服务
    std::unique_ptr<EpollReactor> reactor = std::make_unique<EpollReactor>(256);
    // 把listen_eh注册到reactor服务中
    reactor->RegisterEH(listen_eh, listen_eh->Events());
    // 启动reactor服务
    reactor->Run(10000);
    return 0;
}

编译之后效果如下(注意带上-ljsoncpp选项):https://i-blog.csdnimg.cn/direct/a5f03178a733497187fa0acc5ef41dc2.png

完整代码请访问:

3. 多执行流的Reactor

高并发场景下,单线程 / 单进程的 Reactor 模式会面临性能瓶颈(如 CPU 利用率不足、单个线程被耗时操作阻塞等)。为此,衍生出多线程 Reactor和多进程 Reactor两种设计方案,通过利用多核 CPU 资源提升并发处理能力。

3.1 通过管道实现多进程的Reactor

https://i-blog.csdnimg.cn/direct/89ba0b1e1d4c40f8bc6ec6331c222390.png

  1. 主进程与子进程各自都拥有单独的Reactor,子进程继承主进程的ListenSocket;
  2. 父子进程之间通过管道通信;
  3. 管道读端需要被封装为EventHandler的子类,子进程监听管道的读端,读事件的处理方法为ListenSocket->Accept();
  4. 主进程只监听ListenSocket,当有新连接到来时,父进程根据需求选择一个子进程,并向对应的管道写入信息(任意信息);
  5. 子进程监听到管道读端的读事件就绪,调用读事件的处理方法(Accept并将新连接注册到自己的Reactor中)。

3.2 通过管道实现多线程的Reactor

https://i-blog.csdnimg.cn/direct/e06352631d3148749e2e81f809bb0fff.png

  1. 主线程程与其他线程各自都拥有单独的Reactor;
  2. 主线程负责监听ListenSocket,当有新连接到来时,直接Accept建立连接,获取套接字;
  3. 选择一个合适的线程,通过管道将新连接套接字的文件描述符通过管道传递给该线程;
  4. 其他线程监听管道的读端,读事件的处理方法为:将管道中传来的文件描述符注册到Reactor当中。

3.3 eventfd事件驱动

eventfd 是 Linux 内核提供的一种轻量级事件通知机制,通过创建一个特殊的文件描述符(file descriptor)实现进程间或线程间的事件传递。

它的核心作用是提供高效的 “事件触发” 能力,常被用于事件驱动编程(如 Reactor 模式)中,作为线程 / 进程间的信号通知工具。

3.3.1 基本原理

eventfd 的核心是一个内核维护的无符号 64 位计数器,以及一个关联的文件描述符(eventfd fd)。其工作机制如下:

  • 创建:通过 eventfd() 系统调用创建 eventfd,初始化计数器的值(默认为 0)。
  • 写入(触发事件):通过 write() 向 eventfd 写入一个 64 位整数,内核会将该值加到计数器上。
  • 读取(等待事件):通过 read() 从 eventfd 读取数据,内核会返回当前计数器的值,并将计数器重置为 0(或根据标志位调整行为)。
  • 事件就绪:当计数器的值大于 0 时,eventfd 会变为 “可读” 状态,可被 select/poll/epoll 等 I/O 多路复用机制检测到,从而触发事件处理逻辑。
3.3.2 关键特性
  • 轻量级:仅占用一个文件描述符,比传统的管道(pipe)更节省资源(管道需要两个 fd)。
  • 高效:操作直接在内核中完成,无需数据拷贝(仅传递计数器值),适合高频事件通知。
  • 可集成到 I/O 多路复用:eventfd 的文件描述符可被加入 epoll 等集合中,与其他 I/O 事件(如网络套接字)统一处理,符合 Reactor 模式的事件驱动模型。
  • 灵活的计数器行为:通过创建时的标志位控制读写逻辑,例如:
    • EFD_SEMAPHORE:使 read() 每次读取后计数器减 1(而非清零),类似信号量的行为。
    • EFD_NONBLOCK:设置为非阻塞模式,避免 read()/write() 阻塞。
    • EFD_CLOEXEC:进程 exec 时自动关闭该 fd,避免资源泄漏。
3.3.3 系统调用接口

(1)创建eventfd

#include <sys/eventfd.h>

int eventfd(unsigned int initval, int flags);
  • 参数
    • initval:计数器初始值(通常为 0)。
    • flags:标志位(如 EFD_NONBLOCK、EFD_SEMAPHORE 等),0 表示默认行为。
  • 返回值:成功返回 eventfd 的文件描述符(非负整数),失败返回 -1 并设置 errno。

(2)写入事件(触发)

通过 write() 向 eventfd 写入一个 64 位无符号整数(uint64_t),内核会将其加到计数器上:

uint64_t value = 1;  // 增加 1 到计数器
ssize_t ret = write(efd, &value, sizeof(value));

(3)读取事件(等待)

通过 read() 读取计数器的值,默认会将计数器清零:

uint64_t buf;
ssize_t ret = read(efd, &buf, sizeof(buf));
  • 若计数器 > 0,read() 返回 8 字节,buf 为计数器的当前值,之后计数器被重置为 0。
  • 若计数器 = 0,阻塞模式下 read() 会阻塞等待;非阻塞模式下返回 -1 并设置 errno = EAGAIN。
  • 若创建时指定 EFD_SEMAPHORE,read() 会将计数器减 1 并返回 1(而非清零),适合多线程按顺序处理事件的场景。
3.3.4 典型使用场景
  • 线程间事件通知:在多线程程序中(如 Reactor 模式的 Sub Reactor 线程),主线程可通过 eventfd 向子线程发送 “唤醒” 或 “任务就绪” 信号。
  • 进程间通信:多个进程可通过共享 eventfd(如通过 fork 继承或 sendmsg 传递 fd)实现简单的事件同步。
  • 与 epoll 结合的事件驱动框架:将 eventfd 加入 epoll 监听集合,与网络套接字的 I/O 事件统一处理,避免线程阻塞在 epoll_wait 时无法被外部信号唤醒。
3.3.5 使用eventfd实现多进程/多线程的Reactor
  • 多进程:直接使用eventfd替代管道即可。
  • 多线程:eventfd仅作为通知机制(无法传递数据),被主线程通知的线程需要通过公共的缓冲区来获取连接套接字。或者采用像多进程一样的方式,让被通知的进程访问ListenSocket并自己获取连接套接字。