Linux笔记-Reactor模式与EpollReactor
Linux笔记—Reactor模式与EpollReactor
1. Reactor模式介绍
Reactor 模式是一种事件驱动(Event-Driven)的设计模式,主要用于高效处理并发 I/O 操作,尤其在网络编程中被广泛应用。其核心思想是通过一个或多个 “反应器”(Reactor)监听事件(如连接请求、数据到达、错误等),当事件发生时,自动将事件分发到对应的处理器(Handler)进行处理,从而避免传统阻塞 I/O 的效率问题。
1.1 Reactor 模式的核心组件
- Reactor(反应器):模式的核心协调者,负责管理事件的注册、注销,并在事件发生时调度对应的处理器。它是事件循环的 “大脑”。
- Event Demultiplexer(事件多路分发器):通常基于操作系统的 I/O 多路复用机制(如 select、poll、epoll、kqueue 等),用于阻塞等待多个 I/O 事件的发生,并将就绪的事件通知给 Reactor。
- Handler(处理器):定义事件处理逻辑的组件,负责实际处理事件(如读取数据、发送响应、处理连接等)。每个事件类型通常对应一个 Handler。
- Acceptor(接受器,可选):一种特殊的 Handler,专门处理新的连接请求(如 TCP 的三次握手),并在连接建立后为新连接注册对应的 I/O 事件和 Handler。
1.2 Reactor 模式的典型工作流程
- 初始化:创建 Reactor 和 Event Demultiplexer,注册初始事件(如服务器监听端口的 “连接事件”)及对应的 Handler(如 Acceptor)。
- 事件循环:Reactor 启动循环,通过 Event Demultiplexer 阻塞等待事件发生。
- 事件就绪:当有事件(如客户端连接、数据到达)发生时,Event Demultiplexer 将就绪事件列表返回给 Reactor。
- 事件分发:Reactor 根据事件类型,调用对应的 Handler 处理(如 Acceptor 处理连接请求,I/O Handler 处理数据读写)。
- 处理完成:Handler 处理完毕后,Reactor 继续循环等待下一批事件。
1.3 优缺点与适用场景
- 优点:
- 高效处理并发 I/O,避免传统多线程 / 多进程的资源开销(如线程切换、内存占用)。
- 事件驱动模型清晰,便于扩展和维护。
- 适合 I/O 密集型场景(如网络服务器、消息中间件)。
- 缺点:
- 事件处理逻辑若阻塞会影响整个系统(需配合线程池解决)。
- 设计较复杂,需要正确处理事件注册、分发和异常。
- 典型应用:
- 网络框架:Netty、Mina、Libevent。
- 服务器:Nginx(处理网络事件)、Redis(处理客户端连接)。
- 消息中间件:Kafka(网络 I/O 处理)。
2. EpollReactor网络服务
在Linux环境下,最适合用来实践Reactor模式的底层技术自然就是epoll。接下来,我们就逐步来实现一个基于epoll的Reactor网络服务。
2.1 封装epoll
既然是基于epoll来实现Reactor,那么我们首先将epoll进行一下封装。
在保留epoll原有接口的基础之上,我们可以将Epoll进行操作的基本单位(fd)封装为处理器。
将fd进行封装有一个重要的原因就是:在使用ET触发模式时,每个fd都需要自己的输入输出缓冲区来临时存储就绪的数据。
而我们要做的是一个Reactor网络服务,自然将其封装为处理器是最好的选择。
当然,处理器的种类是很多的,代表着不同类型的fd与不同的处理方式。但是,我们可以确定,对事件的处理有三类:读、写、错误(异常)。
据此,我们可以定义出如下的基类:
#pragma once
#include "Socket.hpp"
#include <memory>
using namespace SocketModule;
class Epoll;
// 事件处理器接口:定义事件处理方法
class EventHandler
{
public:
// 处理读事件
virtual void HandleRecv() {}
// 处理写事件
virtual void HandleSend() {}
// 处理错误事件
virtual void HandleExce() {}
virtual int Fd() = 0;
void SetMonitor(Epoll *monitor)
{
_monitor = monitor;
}
uint32_t Events()
{
return _events;
}
void SetEvents(uint32_t events)
{
_events = events;
}
protected:
EventHandler(Epoll *monitor = nullptr)
: _monitor(monitor)
{}
~EventHandler() {}
std::string _recv_buffer; // 接收缓冲区
std::string _send_buffer; // 发送缓冲区
Epoll *_monitor; // 监视该事件的Epoll实体
// 在处理就绪事件时可能需要调用Epoll实体的某些方法
uint32_t _events; // 关心的事件
};
用户可以根据自己的需求,在这个在这个基类的基础之上,定义出各种事件处理器,例如我们需要用到的TCP套接字的事件处理器:
#pragma once
#include "EventHandler.hpp"
#include "Epoll.hpp"
#include "Common.hpp"
#include <functional>
class ConnectSocketEH;
using service_t = std::function<std::string(std::string&)>;
// 连接套接字事件处理器
class ConnectSocketEH : public EventHandler
{
public:
ConnectSocketEH(std::shared_ptr<TCPConnectSocket> connect_socket, service_t service, Epoll *monitor = nullptr)
: EventHandler(monitor)
, _connect_socket(std::move(connect_socket))
, _service(service)
{
SetNonBlock(_connect_socket->SockFd());
SetEvents(EPOLLET | EPOLLIN);
}
void HandleRecv() override
{
int n = 0;
std::string tmp;
while(true)
{
if((n = _connect_socket->Receive(tmp)) > 0)
{
LOG(LogLevel::INFO) << "收到新的消息: " << tmp;
_recv_buffer += tmp;
}
else if(n == 0)
{
LOG(LogLevel::INFO) << "[" << _connect_socket->Addr().Info() << "]连接已断开! ";
_monitor->DelEventHandler(_monitor->GetHandler(Fd()));
break;
}
else if(errno == EAGAIN || errno == EWOULDBLOCK)
break;
else if(errno == EINTR)
continue;
else
{
LOG(LogLevel::ERROR) << "ConnectSocketEH::HandleRecv: 获取数据出错! ";
HandleExce();
break;
}
}
if(!_recv_buffer.empty())
_send_buffer = _service(_recv_buffer); // 调用上层服务处理数据
if(!_send_buffer.empty())
{
HandleSend(); // 最佳实践
// 或者开启写事件关心
// _monitor->ModEventHandler(_monitor->GetHandler(Fd()), Events() | EPOLLOUT);
}
}
void HandleSend() override
{
while(true)
{
int n = _connect_socket->Send(_send_buffer);
if(n > 0)
{
// 未发完,删掉已发部分继续发
_send_buffer.erase(0, n);
}
else if(n == 0)
{
// 发完
break;
}
else
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
break;
else if(errno == EINTR)
continue;
else
{
LOG(LogLevel::ERROR) << "ConnectSocketEH::HandleSend: 发送失败! ";
HandleExce();
break;
}
}
}
if(!_send_buffer.empty())
{
// 写条件不满足, 开启对写事件的关心
SetEvents(Events() | EPOLLOUT);
_monitor->ModEventHandler(_monitor->GetHandler(Fd()), Events());
}
else
{
// 数据已写完, 关闭对写事件的关心
SetEvents(Events() & ~EPOLLOUT);
_monitor->ModEventHandler(_monitor->GetHandler(Fd()), Events());
}
}
void HandleExce() override
{
_monitor->DelEventHandler(_monitor->GetHandler(Fd()));
}
int Fd() override
{
return _connect_socket->SockFd();
}
~ConnectSocketEH() {}
private:
std::shared_ptr<TCPConnectSocket> _connect_socket;
service_t _service;
};
// 监听套接字事件处理器
class ListenSocketEH : public EventHandler
{
public:
ListenSocketEH(in_port_t port, service_t service, Epoll *monitor = nullptr)
: EventHandler(monitor)
, _service(service)
{
_listen_socket = std::make_shared<TCPListenSocket>(port);
SetNonBlock(_listen_socket->SockFd());
SetEvents(EPOLLET | EPOLLIN);
}
void HandleRecv() override
{
std::shared_ptr<TCPConnectSocket> connect_socket = nullptr;
while (true)
{
if(connect_socket = _listen_socket->Accept())
{
std::shared_ptr<ConnectSocketEH> cs_eh = std::make_shared<ConnectSocketEH>(connect_socket, _service, _monitor);
_monitor->AddEventHandler(cs_eh, EPOLLIN | EPOLLET);
}
else if(errno == EAGAIN || errno == EWOULDBLOCK)
break;
else if(errno == EINTR)
continue;
else
{
LOG(LogLevel::ERROR) << "ListenSocketEH::HandleRecv: 建立新连接出错! ";
break;
}
}
}
int Fd() override
{
return _listen_socket->SockFd();
}
~ListenSocketEH() {}
private:
std::shared_ptr<TCPListenSocket> _listen_socket;
service_t _service;
};
使用EventHandler类作为操作的基本单位,在尽可能保留epoll原本使用流程的前提下,我们可以封装出如下的Epoll类:
#pragma once
#include "EventHandler.hpp"
#include <unordered_map>
#include <sys/epoll.h>
#include "Common.hpp"
class Epoll
{
public:
Epoll()
{
_epfd = epoll_create1(0);
if(_epfd == -1)
{
LOG(LogLevel::FATAL) << "EpollReactor: epoll实例创建失败! ";
exit(EPCREATE_ERROR);
}
}
// 添加事件处理器
bool AddEventHandler(const std::shared_ptr<EventHandler>& event_handler, uint32_t events)
{
int fd = event_handler->Fd();
if(_event_handlers.count(fd))
{
LOG(LogLevel::WARNING) << "EpollReactor::AddEventHandler: fd[" << fd << "]已存在! ";
return false;
}
epoll_event epev;
epev.data.fd = fd;
epev.events = events;
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &epev);
if(n == -1)
{
LOG(LogLevel::ERROR) << "EpollReactor::AddEventHandler: fd[" << fd << "]添加失败! " << strerror(errno);
return false;
}
_event_handlers[fd] = event_handler;
return true;
}
// 删除事件处理器
bool DelEventHandler(const std::shared_ptr<EventHandler>& event_handler)
{
int fd = event_handler->Fd();
if(!_event_handlers.count(fd))
{
LOG(LogLevel::WARNING) << "EpollReactor::DelEventHandler: fd[" << fd << "]不存在! ";
return false;
}
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
if(n == -1)
{
LOG(LogLevel::ERROR) << "EpollReactor::DelEventHandler: fd[" << fd << "]删除失败! " << strerror(errno);
return false;
}
_event_handlers.erase(fd);
return true;
}
// 修改事件处理器
bool ModEventHandler(const std::shared_ptr<EventHandler>& event_handler, uint32_t events)
{
int fd = event_handler->Fd();
if(!_event_handlers.count(fd))
{
LOG(LogLevel::WARNING) << "EpollReactor::ModEventHandler: fd[" << fd << "]不存在! ";
return false;
}
epoll_event epev;
epev.data.fd = fd;
epev.events = events;
int n = epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &epev);
if(n == -1)
{
LOG(LogLevel::ERROR) << "EpollReactor::ModEventHandler: fd[" << fd << "]修改失败! " << strerror(errno);
return false;
}
return true;
}
// 等待事件触发
int Wait(std::vector<epoll_event>& ready, int timeout)
{
int n = epoll_wait(_epfd, ready.data(), ready.size(), timeout);
if(n == 0)
{
LOG(LogLevel::WARNING) << "EpollReactor::Wait: 超时... ";
}
else if(n == -1)
{
LOG(LogLevel::ERROR) << "EpollReactor::Wait: 出错! ";
}
return n;
}
// 获取对应文件描述符的事件处理器
std::shared_ptr<EventHandler> GetHandler(int fd)
{
if(!_event_handlers.count(fd))
return nullptr;
return _event_handlers[fd];
}
size_t Size()
{
return _event_handlers.size();
}
~Epoll() {}
private:
int _epfd;
std::unordered_map<int, std::shared_ptr<EventHandler>> _event_handlers;
};
2.2 封装EpollReactor
现在,我们有了处理器、接受器(TCPSocketEH),现在还缺少反应器与事件多路分发器。
但是,实际上我们的Epoll类已经完成了反应器和多路分发器的大部分工作,接下来只需要按照Reactor的模式的经典工作流程来使用Epoll类即可。
#pragma once
#include "Epoll.hpp"
#include "SocketEH.hpp"
class EpollReactor
{
private:
static const int default_one_loop_num = 256;
// 多路分发器
void Dispatch(int n)
{
for(int i = 0; i < n; i++)
{
std::shared_ptr<EventHandler> eh_ptr = _epoll.GetHandler(_ready_events[i].data.fd);
uint32_t events = _ready_events[i].events;
if(events & EPOLLERR || events & EPOLLHUP)
events = (EPOLLIN | EPOLLOUT);
if(events & EPOLLIN)
eh_ptr->HandleRecv();
if(events & EPOLLOUT)
eh_ptr->HandleSend();
}
}
public:
EpollReactor(int one_loop_num = default_one_loop_num)
: _isrunning(false)
, _ready_events(one_loop_num)
{}
void Run(int timeout)
{
_isrunning = true;
while(_isrunning)
{
int n = _epoll.Wait(_ready_events, timeout);
if(n > 0)
{
LOG(LogLevel::INFO) << "有事件就绪...";
Dispatch(n);
}
}
_isrunning = false;
}
// 向_epoll中注册事件处理器
bool RegisterEH(const std::shared_ptr<EventHandler>& event_handler, uint32_t events)
{
// 事件处理器在处理就绪事件时,可能会调用到Epoll类的方法
event_handler->SetMonitor(&_epoll);
return _epoll.AddEventHandler(event_handler, events);
}
~EpollReactor() {}
private:
Epoll _epoll;
bool _isrunning;
std::vector<epoll_event> _ready_events;
};
2.3 添加上层服务
现在我们的Reactor部分就完工了,上层的服务如何使用这个Reactor呢?
很简单,我们在ListenSockeEH和ConnectSocketEH当中都定义了一个字段—service。
上层的服务只需要将自己处理数据的服务注册到ListenSocketEH当中,再将这个ListenSocketEH实例注册到EpollReactor当中即可。
- 每当有连接到来时(监听套接字的读事件),ListenSocketEH处理读事件的方法就会将这个服务注册到新创建的ConnectSocketEH当中;
- 而每当有数据到来时(连接套接字的读事件),ConnectSocketEH就会尝试调用该服务来处理数据。
我们之前使用多进程/多线程的方式做过一个网络计算器:客户端将数据发送给服务器,服务器计算之后返回结果(功能很鸡肋,仅作练习用)。
我们可以改造一下这个计算器的协议部分,然后将其作为上层服务搬到这边来:
// 协议核心回调函数修改前
void ServerService(const std::shared_ptr<TCPConnectSocket>& socket)
{
std::string package, buffer, json_str, send_msg;
while(socket->Receive(buffer))
{
package += buffer;
if(decapsulate(package, json_str))
{
Request request(json_str);
// 使用顶层提供的回调函数来处理请求
Response response = _handler(request);
encapsulate(response.JsonStr(), send_msg);
socket->Send(send_msg);
}
}
LOG(LogLevel::INFO) << "[" << socket->addr().Info() << "]连接已断开! ";
}
// 协议核心回调函数修改后
std::string ServerService(std::string &package)
{
std::string buffer, json_str, send_msg;
if (decapsulate(package, json_str))
{
Request request(json_str);
// 使用顶层提供的回调函数来处理请求
Response response = _handler(request);
encapsulate(response.JsonStr(), send_msg);
}
return send_msg;
}
接着,在主函数中一顿套娃:
#include "EpollReactor.hpp"
#include "Common.hpp"
#include "Calculator.hpp"
#include "Protocol.hpp"
int main(int argc, char *argv[])
{
if(argc != 2)
{
LOG(LogLevel::FATAL) << "Usage: " << argv[0] << " + port";
return USAGE_ERROR;
}
in_port_t port = std::stoi(argv[1]);
// 顶层服务: 计算器
Calculator calculator;
// 协议层
Protocol protocol([&calculator](const Request& request)->Response{
return calculator.Calculate(request);
});
// 协议层回调函数注册到listen套接字事件处理器listen_eh
// 该回调函数会被listen_eh注册到每个连接套接字的处理器
std::shared_ptr<ListenSocketEH> listen_eh = std::make_shared<ListenSocketEH>(port, [&protocol](std::string& buffer){
return protocol.ServerService(buffer);
});
// 创建reactor服务
std::unique_ptr<EpollReactor> reactor = std::make_unique<EpollReactor>(256);
// 把listen_eh注册到reactor服务中
reactor->RegisterEH(listen_eh, listen_eh->Events());
// 启动reactor服务
reactor->Run(10000);
return 0;
}
编译之后效果如下(注意带上-ljsoncpp选项):
完整代码请访问:
3. 多执行流的Reactor
在高并发场景下,单线程 / 单进程的 Reactor 模式会面临性能瓶颈(如 CPU 利用率不足、单个线程被耗时操作阻塞等)。为此,衍生出多线程 Reactor和多进程 Reactor两种设计方案,通过利用多核 CPU 资源提升并发处理能力。
3.1 通过管道实现多进程的Reactor
- 主进程与子进程各自都拥有单独的Reactor,子进程继承主进程的ListenSocket;
- 父子进程之间通过管道通信;
- 管道读端需要被封装为EventHandler的子类,子进程监听管道的读端,读事件的处理方法为ListenSocket->Accept();
- 主进程只监听ListenSocket,当有新连接到来时,父进程根据需求选择一个子进程,并向对应的管道写入信息(任意信息);
- 子进程监听到管道读端的读事件就绪,调用读事件的处理方法(Accept并将新连接注册到自己的Reactor中)。
3.2 通过管道实现多线程的Reactor
- 主线程程与其他线程各自都拥有单独的Reactor;
- 主线程负责监听ListenSocket,当有新连接到来时,直接Accept建立连接,获取套接字;
- 选择一个合适的线程,通过管道将新连接套接字的文件描述符通过管道传递给该线程;
- 其他线程监听管道的读端,读事件的处理方法为:将管道中传来的文件描述符注册到Reactor当中。
3.3 eventfd事件驱动
eventfd 是 Linux 内核提供的一种轻量级事件通知机制,通过创建一个特殊的文件描述符(file descriptor)实现进程间或线程间的事件传递。
它的核心作用是提供高效的 “事件触发” 能力,常被用于事件驱动编程(如 Reactor 模式)中,作为线程 / 进程间的信号通知工具。
3.3.1 基本原理
eventfd 的核心是一个内核维护的无符号 64 位计数器,以及一个关联的文件描述符(eventfd fd)。其工作机制如下:
- 创建:通过 eventfd() 系统调用创建 eventfd,初始化计数器的值(默认为 0)。
- 写入(触发事件):通过 write() 向 eventfd 写入一个 64 位整数,内核会将该值加到计数器上。
- 读取(等待事件):通过 read() 从 eventfd 读取数据,内核会返回当前计数器的值,并将计数器重置为 0(或根据标志位调整行为)。
- 事件就绪:当计数器的值大于 0 时,eventfd 会变为 “可读” 状态,可被 select/poll/epoll 等 I/O 多路复用机制检测到,从而触发事件处理逻辑。
3.3.2 关键特性
- 轻量级:仅占用一个文件描述符,比传统的管道(pipe)更节省资源(管道需要两个 fd)。
- 高效:操作直接在内核中完成,无需数据拷贝(仅传递计数器值),适合高频事件通知。
- 可集成到 I/O 多路复用:eventfd 的文件描述符可被加入 epoll 等集合中,与其他 I/O 事件(如网络套接字)统一处理,符合 Reactor 模式的事件驱动模型。
- 灵活的计数器行为:通过创建时的标志位控制读写逻辑,例如:
- EFD_SEMAPHORE:使 read() 每次读取后计数器减 1(而非清零),类似信号量的行为。
- EFD_NONBLOCK:设置为非阻塞模式,避免 read()/write() 阻塞。
- EFD_CLOEXEC:进程 exec 时自动关闭该 fd,避免资源泄漏。
3.3.3 系统调用接口
(1)创建eventfd
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
- 参数:
- initval:计数器初始值(通常为 0)。
- flags:标志位(如 EFD_NONBLOCK、EFD_SEMAPHORE 等),0 表示默认行为。
- 返回值:成功返回 eventfd 的文件描述符(非负整数),失败返回 -1 并设置 errno。
(2)写入事件(触发)
通过 write() 向 eventfd 写入一个 64 位无符号整数(uint64_t),内核会将其加到计数器上:
uint64_t value = 1; // 增加 1 到计数器
ssize_t ret = write(efd, &value, sizeof(value));
(3)读取事件(等待)
通过 read()
读取计数器的值,默认会将计数器清零:
uint64_t buf;
ssize_t ret = read(efd, &buf, sizeof(buf));
- 若计数器 > 0,read() 返回 8 字节,buf 为计数器的当前值,之后计数器被重置为 0。
- 若计数器 = 0,阻塞模式下 read() 会阻塞等待;非阻塞模式下返回 -1 并设置 errno = EAGAIN。
- 若创建时指定 EFD_SEMAPHORE,read() 会将计数器减 1 并返回 1(而非清零),适合多线程按顺序处理事件的场景。
3.3.4 典型使用场景
- 线程间事件通知:在多线程程序中(如 Reactor 模式的 Sub Reactor 线程),主线程可通过 eventfd 向子线程发送 “唤醒” 或 “任务就绪” 信号。
- 进程间通信:多个进程可通过共享 eventfd(如通过 fork 继承或 sendmsg 传递 fd)实现简单的事件同步。
- 与 epoll 结合的事件驱动框架:将 eventfd 加入 epoll 监听集合,与网络套接字的 I/O 事件统一处理,避免线程阻塞在 epoll_wait 时无法被外部信号唤醒。
3.3.5 使用eventfd实现多进程/多线程的Reactor
- 多进程:直接使用eventfd替代管道即可。
- 多线程:eventfd仅作为通知机制(无法传递数据),被主线程通知的线程需要通过公共的缓冲区来获取连接套接字。或者采用像多进程一样的方式,让被通知的进程访问ListenSocket并自己获取连接套接字。