[经验分享] 【SUBJECT技术】epoll设计思路及在MSDP子系统中的应用举例 原创

诚迈_雨哥 显示全部楼层 发表于 2024-6-1 16:16:23

一、多路转接I/O – epoll模型

1. 什么是epoll

epoll接口是为解决Linux内核处理大量文件描述符而提出的方案。该接口属于Linux下多路I/O复用接口中select/poll的增强。其经常应用于Linux下高并发服务型程序,特别是在大量并发连接中只有少部分连接处于活跃下的情况 (通常是这种情况),在该情况下能显著的提高程序的CPU利用率。

2. epoll设计思路简介

a. epoll在Linux内核中构建了一个文件系统,该文件系统采用红黑树来构建,红黑树在增加和删除上面的效率极高,因此是epoll高效的原因之一。有兴趣可以百度红黑树了解,但在这里你只需知道其算法效率超高即可。 b. epoll提供了两种触发模式,水平触发(LT)和边沿触发(ET)。当然,涉及到I/O操作也必然会有阻塞和非阻塞两种方案。目前效率相对较高的是 epoll+ET+非阻塞I/O 模型,在具体情况下应该合理选用当前情形中最优的搭配方案。 c. epoll所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于1024,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以下面语句查看,一般来说这个数目和系统内存关系很大。

call_epoll_prodedure.PNG

Epoll 作为一种 IO 复用机制多应用与高并发领域,网上有很多如何使用 epoll 的基础教程,但对于 epoll 中很重要的结构体 epoll_event 讲的都模棱两可,这篇文章将做深入解析

二、epoll 的使用方法

第一步,创建 epoll

首先调用 int epoll_create(int size)创建一个 epoll,返回一个fd_;

第二步,向epoll添加事件

调用 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) 为 epoll 注册事件(如果是新建的 epoll 一般 op 选项是EPOLL_CTL_ADD添加事件);

第三步,持续等待产生的事件,然后处理

调用 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)等待事件的到来,得到的结果存储在 event 中;

第四步,从epoll移除要监听的事件

完全处理完毕后,再次调用 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)删除已经注册的事件(op 选项是EPOLL_CTL_DEL); 值得注意的是epoll_wait函数只能获取是否有注册事件发生,至于这个事件到底是什么、从哪个 socket 来、发送的时间、包的大小等等信息,统统不知道。当然调用者和被调用是知道的(提前约定好,并且存放在epoll_event中)

第五步,用 close(fd_) 关闭这个 epoll

epoll_rbtree.PNG

三、epoll_event结构体定义

struct epoll_event 一般是在 <sys/epoll.h> 头文件中定义

struct epoll_event {
    uint32_t events;  // epoll 事件类型,包括可读,可写等
    epoll_data_t data; // 用户数据,可以是一个指针或文件描述符等
};

1. events 字段

表示要监听的事件类型,可以是以下值之一: EPOLLIN:表示对应的文件描述符上有数据可读 EPOLLOUT:表示对应的文件描述符上可以写入数据 EPOLLRDHUP:表示对端已经关闭连接,或者关闭了写操作端的写入 EPOLLPRI:表示有紧急数据可读 EPOLLERR:表示发生错误 EPOLLHUP:表示文件描述符被挂起 EPOLLET:表示将epoll设置为边缘触发模式 EPOLLONESHOT:表示将事件设置为一次性事件

2. data 字段

表示用户数据,它的类型是一个union,可以存放一个指针或文件描述符等数据。它的定义如下:

typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

ptr可以指向任何类型的用户数据,fd表示文件描述符,u32和u64分别表示一个32位和64位的无符号整数。使用时,用户可以将自己需要的数据存放到这个字段中,当事件触发时,epoll系统调用会返回这个数据,以便用户处理事件。

四、关于 epoll 的应用实例

MSDP 子系统分别在代理任务DelegateTask,时钟TimerManager,设备上下线管理DeviceManager以及向客户端发通知的地方都用到了这一机制。

EPOLL_IN_MSDP.png

epoll 调用的入口,在 devicestatus_service

// devicestatus_service.h
    int32_t AddEpoll(EpollEventType type, int32_t fd) override;
    int32_t DelEpoll(EpollEventType type, int32_t fd);
    void OnThread();
    void OnDelegateTask(const epoll_event &ev);
    void OnTimeout(const epoll_event &ev);
    void OnDeviceMgr(const epoll_event &ev);

// devicestatus_service.cpp
struct device_status_epoll_event {
    int32_t fd { 0 };
    EpollEventType event_type { EPOLL_EVENT_BEGIN };
};

// DeviceStatusService 初始化时首先被调用
void DeviceStatusService::OnStart()
{
    // 初始化监控对象
    Init();
    state_ = ServiceRunningState::STATE_RUNNING;
    ready_ = true;
    // 开启线程,有事件发生时执行
    worker_ = std::thread(std::bind(&DeviceStatusService::OnThread, this));
}
// 初始化三种典型监控对象
bool DeviceStatusService::Init()
{
    if (!devicestatusManager_->Init()) {
        return false;
    }
    if (EpollCreate() != RET_OK) {
        return false;
    }
    if (InitDelegateTasks() != RET_OK) {
        goto INIT_FAIL;
    }
    if (InitTimerMgr() != RET_OK) {
        goto INIT_FAIL;
    }
   // ...
    return true;
INIT_FAIL:
    EpollClose();
    return false;
}
// 添加 EPOLL_EVENT_ETASK监控对象
int32_t DeviceStatusService::InitDelegateTasks()
{
    if (!delegateTasks_.Init()) {
        return RET_ERR;
    }
    int32_t ret = AddEpoll(EPOLL_EVENT_ETASK, delegateTasks_.GetReadFd());
    return ret;
}
// 添加 EPOLL_EVENT_TIMER监控对象
int32_t DeviceStatusService::InitTimerMgr()
{
    int32_t ret = timerMgr_.Init(this);
    ret = AddEpoll(EPOLL_EVENT_TIMER, timerMgr_.GetTimerFd());
    return ret;
}
// 添加EPOLL_EVENT_DEVICE_MGR监控对象
int32_t DeviceStatusService::EnableDevMgr(int32_t nRetries)
{
    static int32_t timerId { -1 };
    int32_t ret = devMgr_.Enable();
    AddEpoll(EPOLL_EVENT_DEVICE_MGR, devMgr_.GetFd());

    return RET_OK;
}

// AddEpoll 进一步调用 EpollCtl的EPOLL_CTL_ADD操作
int32_t DeviceStatusService::AddEpoll(EpollEventType type, int32_t fd)
{
    auto eventData = static_cast<device_status_epoll_event*>(malloc(sizeof(device_status_epoll_event)));
    eventData->fd = fd;
    eventData->event_type = type;
    struct epoll_event ev {};
    ev.events = EPOLLIN;
    ev.data.ptr = eventData;
    if (EpollCtl(fd, EPOLL_CTL_ADD, ev) != RET_OK) {
        free(eventData);
        eventData = nullptr;
        ev.data.ptr = nullptr;
        return RET_ERR;
    }
    return RET_OK;
}

// 线程内,有事件发生时执行
void DeviceStatusService::OnThread()
{
    SetThreadName(std::string("os_ds_service"));
    uint64_t tid = GetThisThreadId();
    delegateTasks_.SetWorkerThreadId(tid);
    EnableDevMgr(MAX_N_RETRIES);
    while (state_ == ServiceRunningState::STATE_RUNNING) {
        struct epoll_event ev[MAX_EVENT_SIZE] {};
        int32_t count = EpollWait(MAX_EVENT_SIZE, -1, ev[0]);
        for (int32_t i = 0; i < count && state_ == ServiceRunningState::STATE_RUNNING; i++) {
            auto epollEvent = reinterpret_cast<device_status_epoll_event*>(ev[i].data.ptr);
            if (epollEvent->event_type == EPOLL_EVENT_SOCKET) {
                OnEpollEvent(ev[i]);
            } else if (epollEvent->event_type == EPOLL_EVENT_ETASK) {
                OnDelegateTask(ev[i]);
            } else if (epollEvent->event_type == EPOLL_EVENT_TIMER) {
                OnTimeout(ev[i]);
            } else if (epollEvent->event_type == EPOLL_EVENT_DEVICE_MGR) {
                OnDeviceMgr(ev[i]);
            } else {
            }
        }
    }
}

// 事件发生后,read 读取、调度处理

void DeviceStatusService::OnDelegateTask(const struct epoll_event &ev)
{
    if ((ev.events & EPOLLIN) == 0) {
        return;
    }
    DelegateTasks::TaskData data {};
    ssize_t res = read(delegateTasks_.GetReadFd(), &data, sizeof(data));
    delegateTasks_.ProcessTasks();
}
void DeviceStatusService::OnTimeout(const struct epoll_event &ev)
{
    if ((ev.events & EPOLLIN) == EPOLLIN) {
        uint64_t expiration {};
        ssize_t ret = read(timerMgr_.GetTimerFd(), &expiration, sizeof(expiration));
        timerMgr_.ProcessTimers();
    } else if ((ev.events & (EPOLLHUP | EPOLLERR)) != 0) {
    }
}
void DeviceStatusService::OnDeviceMgr(const struct epoll_event &ev)
{
    if ((ev.events & EPOLLIN) == EPOLLIN) {
        devMgr_.Dispatch(ev);
    } else if ((ev.events & (EPOLLHUP | EPOLLERR)) != 0) {
    }
}

// 当取消监控或者正常结束时应删除监控对象


void DeviceStatusService::DisableDevMgr()
{
    DelEpoll(EPOLL_EVENT_DEVICE_MGR, devMgr_.GetFd());
    devMgr_.Disable();
}

int32_t DeviceStatusService::DelEpoll(EpollEventType type, int32_t fd)
{
    struct epoll_event ev {};
    if (EpollCtl(fd, EPOLL_CTL_DEL, ev) != RET_OK) {
        return RET_ERR;
    }
    return RET_OK;
}

epoll 调用的又一个入口,在 StreamServer, 针对EPOLL_EVENT_SOCKET监控对象的

// stream_server.h
enum EpollEventType {
    EPOLL_EVENT_BEGIN = 0,
    EPOLL_EVENT_INPUT = EPOLL_EVENT_BEGIN,
    EPOLL_EVENT_SOCKET,
    EPOLL_EVENT_ETASK,
    EPOLL_EVENT_TIMER,
    EPOLL_EVENT_DEVICE_MGR,
    EPOLL_EVENT_END
};

    virtual int32_t AddEpoll(EpollEventType type, int32_t fd) = 0;
    void ReleaseSession(int32_t fd, epoll_event &ev);
    void OnEpollRecv(int32_t fd, epoll_event &ev);
    void OnEpollEvent(epoll_event &ev);

// stream_server.cpp
void StreamServer::UdsStop()
{
    if (epollFd_ != -1) {
        if (close(epollFd_) < 0) {
        }
        epollFd_ = -1;
    }
}
int32_t StreamServer::AddSocketPairInfo(const std::string &programName, int32_t moduleType, int32_t uid, int32_t pid,
    int32_t &serverFd, int32_t &toReturnClientFd, int32_t &tokenType)
{
    int32_t sockFds[2] = { -1 };
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockFds) != 0) {
        return RET_ERR;
    }
    serverFd = sockFds[0];
    toReturnClientFd = sockFds[1];
    // ...
    if (AddEpoll(EPOLL_EVENT_SOCKET, serverFd) != RET_OK) {
        return CloseFd(serverFd, toReturnClientFd);
    }
    OnConnected(sess);
    return RET_OK;
}

void StreamServer::ReleaseSession(int32_t fd, epoll_event &ev)
{
    //...
    auto DeviceStatusService = DeviceStatus::DelayedSpSingleton<DeviceStatus::DeviceStatusService>::GetInstance();
    DeviceStatusService->DelEpoll(EPOLL_EVENT_SOCKET, fd);
    if (close(fd) < 0) {
    }
}
void StreamServer::OnEpollRecv(int32_t fd, epoll_event &ev)
{
    if (fd < 0) {
        return;
    }
    auto& buf = circleBufs_[fd];
    char szBuf[MAX_PACKET_BUF_SIZE] = { 0 };
    for (int32_t i = 0; i < MAX_RECV_LIMIT; i++) {
        ssize_t size = recv(fd, szBuf, MAX_PACKET_BUF_SIZE, MSG_DONTWAIT | MSG_NOSIGNAL);
       //...
    }
}

void StreamServer::OnEpollEvent(epoll_event &ev)
{
    int32_t fd = *static_cast<int32_t*>(ev.data.ptr);
    if (fd < 0) {
        return;
    }
    if ((ev.events & EPOLLERR) || (ev.events & EPOLLHUP)) {
        ReleaseSession(fd, ev);
    } else if (ev.events & EPOLLIN) {
        OnEpollRecv(fd, ev);
    }
}

epoll 调用的出口,在 StreamSocket

// stream_socket.h
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/socket.h>

    int32_t EpollCreate();
    int32_t EpollCtl(int32_t fd, int32_t op, struct epoll_event &event);
    int32_t EpollWait(int32_t maxevents, int32_t timeout, struct epoll_event &events);
    void EpollClose();
    int32_t epollFd_ { -1 };

// stream_socket.cpp
StreamSocket::~StreamSocket()
{
    EpollClose();
}
int32_t StreamSocket::EpollCreate()
{
    epollFd_ = ::epoll_create1(EPOLL_CLOEXEC);
    return RET_OK;
}
int32_t StreamSocket::EpollCtl(int32_t fd, int32_t op, struct epoll_event &event)
{
    if (::epoll_ctl(epollFd_, op, fd, &event) != 0) {
        return RET_ERR;
    }
    return RET_OK;
}
int32_t StreamSocket::EpollWait(int32_t maxevents, int32_t timeout, struct epoll_event &events)
{
    return epoll_wait(epollFd_, &events, maxevents, timeout);
}

void StreamSocket::EpollClose()
{
    if (epollFd_ >= 0) {
        if (close(epollFd_) < 0) {
        }
        epollFd_ = -1;
    }
}

参考文献、链接

©著作权归作者所有,转载或内容合作请联系作者

您尚未登录,无法参与评论,登录后可以:
参与开源共建问题交流
认同或收藏高质量问答
获取积分成为开源共建先驱

Copyright   ©2023  OpenHarmony开发者论坛  京ICP备2020036654号-3 |技术支持 Discuz!

返回顶部