OpenHarmony开发者论坛

标题: 【SUBJECT技术】epoll设计思路及在MSDP子系统中的应用举例 [打印本页]

作者: 诚迈_雨哥    时间: 2024-6-1 16:16
标题: 【SUBJECT技术】epoll设计思路及在MSDP子系统中的应用举例
[md]# 一、多路转接I/O – epoll模型

## 1. 什么是epoll

epoll接口是为解决Linux内核处理大量文件描述符而提出的方案。该接口属于Linux下多路I/O复用接口中select/poll的增强。其经常应用于Linux下高并发服务型程序,特别是在大量并发连接中只有少部分连接处于活跃下的情况 (通常是这种情况),在该情况下能显著的提高程序的CPU利用率。

## 2. epoll设计思路简介

a. epoll在Linux内核中构建了一个文件系统,该文件系统采用红黑树来构建,红黑树在增加和删除上面的效率极高,因此是epoll高效的原因之一。有兴趣可以百度红黑树了解,但在这里你只需知道其算法效率超高即可。
b. epoll提供了两种触发模式,水平触发(LT)和边沿触发(ET)。当然,涉及到I/O操作也必然会有阻塞和非阻塞两种方案。目前效率相对较高的是 epoll+ET+非阻塞I/O 模型,在具体情况下应该合理选用当前情形中最优的搭配方案。
c. epoll所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于1024,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以下面语句查看,一般来说这个数目和系统内存关系很大。

![call_epoll_prodedure.PNG](https://forums-obs.openharmony.c ... dpnco5egffyen6x.png "call_epoll_prodedure.PNG")

![](./figures/call_epoll_prodedure.png)

Epoll 作为一种 IO 复用机制多应用与高并发领域,网上有很多如何使用 epoll 的基础教程,但对于 epoll 中很重要的结构体 epoll_event 讲的都模棱两可,这篇文章将做深入解析

# 二、epoll 的使用方法

## 第一步,创建 epoll

首先调用 `int epoll_create(int size)`创建一个 epoll,返回一个fd_;

## 第二步,向epoll添加事件

调用 `int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)` 为 epoll 注册事件(如果是新建的 epoll 一般 op 选项是EPOLL_CTL_ADD添加事件);

## 第三步,持续等待产生的事件,然后处理

调用 `int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)`等待事件的到来,得到的结果存储在 event 中;

## 第四步,从epoll移除要监听的事件

完全处理完毕后,再次调用 `int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)`删除已经注册的事件(op 选项是EPOLL_CTL_DEL);
值得注意的是epoll_wait函数只能获取是否有注册事件发生,至于这个事件到底是什么、从哪个 socket 来、发送的时间、包的大小等等信息,统统不知道。当然调用者和被调用是知道的(提前约定好,并且存放在epoll_event中)

## 第五步,用 close(fd_) 关闭这个 epoll

![epoll_rbtree.PNG](https://forums-obs.openharmony.c ... kk0cl1wclmi5gpk.png "epoll_rbtree.PNG")

![](./figures/epoll_rbtree.png)

# 三、epoll_event结构体定义

struct epoll_event 一般是在 <sys/epoll.h> 头文件中定义

```
struct epoll_event {
    uint32_t events;  // epoll 事件类型,包括可读,可写等
    epoll_data_t data; // 用户数据,可以是一个指针或文件描述符等
};
```

## 1. events 字段

表示要监听的事件类型,可以是以下值之一:
EPOLLIN:表示对应的文件描述符上有数据可读
EPOLLOUT:表示对应的文件描述符上可以写入数据
EPOLLRDHUP:表示对端已经关闭连接,或者关闭了写操作端的写入
EPOLLPRI:表示有紧急数据可读
EPOLLERR:表示发生错误
EPOLLHUP:表示文件描述符被挂起
EPOLLET:表示将epoll设置为边缘触发模式
EPOLLONESHOT:表示将事件设置为一次性事件

## 2. data 字段

表示用户数据,它的类型是一个union,可以存放一个指针或文件描述符等数据。它的定义如下:

```
typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;
```

**ptr**可以指向任何类型的用户数据,fd表示文件描述符,u32和u64分别表示一个32位和64位的无符号整数。使用时,用户可以将自己需要的数据存放到这个字段中,当事件触发时,epoll系统调用会返回这个数据,以便用户处理事件。

# 四、关于 epoll 的应用实例

MSDP 子系统分别在代理任务DelegateTask,时钟TimerManager,设备上下线管理DeviceManager以及向客户端发通知的地方都用到了这一机制。

![EPOLL_IN_MSDP.png](https://forums-obs.openharmony.c ... hrmmlzxeeritkme.png "EPOLL_IN_MSDP.png")

![](./figures/EPOLL_IN_MSDP.png)

## epoll 调用的入口,在 devicestatus_service

```cpp
// devicestatus_service.h
    int32_t AddEpoll(EpollEventType type, int32_t fd) override;
    int32_t DelEpoll(EpollEventType type, int32_t fd);
    void OnThread();
    void OnDelegateTask(const epoll_event &ev);
    void OnTimeout(const epoll_event &ev);
    void OnDeviceMgr(const epoll_event &ev);

// devicestatus_service.cpp
struct device_status_epoll_event {
    int32_t fd { 0 };
    EpollEventType event_type { EPOLL_EVENT_BEGIN };
};

// DeviceStatusService 初始化时首先被调用
void DeviceStatusService::OnStart()
{
    // 初始化监控对象
    Init();
    state_ = ServiceRunningState::STATE_RUNNING;
    ready_ = true;
    // 开启线程,有事件发生时执行
    worker_ = std::thread(std::bind(&DeviceStatusService::OnThread, this));
}
// 初始化三种典型监控对象
bool DeviceStatusService::Init()
{
    if (!devicestatusManager_->Init()) {
        return false;
    }
    if (EpollCreate() != RET_OK) {
        return false;
    }
    if (InitDelegateTasks() != RET_OK) {
        goto INIT_FAIL;
    }
    if (InitTimerMgr() != RET_OK) {
        goto INIT_FAIL;
    }
   // ...
    return true;
INIT_FAIL:
    EpollClose();
    return false;
}
// 添加 EPOLL_EVENT_ETASK监控对象
int32_t DeviceStatusService::InitDelegateTasks()
{
    if (!delegateTasks_.Init()) {
        return RET_ERR;
    }
    int32_t ret = AddEpoll(EPOLL_EVENT_ETASK, delegateTasks_.GetReadFd());
    return ret;
}
// 添加 EPOLL_EVENT_TIMER监控对象
int32_t DeviceStatusService::InitTimerMgr()
{
    int32_t ret = timerMgr_.Init(this);
    ret = AddEpoll(EPOLL_EVENT_TIMER, timerMgr_.GetTimerFd());
    return ret;
}
// 添加EPOLL_EVENT_DEVICE_MGR监控对象
int32_t DeviceStatusService::EnableDevMgr(int32_t nRetries)
{
    static int32_t timerId { -1 };
    int32_t ret = devMgr_.Enable();
    AddEpoll(EPOLL_EVENT_DEVICE_MGR, devMgr_.GetFd());

    return RET_OK;
}

// AddEpoll 进一步调用 EpollCtl的EPOLL_CTL_ADD操作
int32_t DeviceStatusService::AddEpoll(EpollEventType type, int32_t fd)
{
    auto eventData = static_cast<device_status_epoll_event*>(malloc(sizeof(device_status_epoll_event)));
    eventData->fd = fd;
    eventData->event_type = type;
    struct epoll_event ev {};
    ev.events = EPOLLIN;
    ev.data.ptr = eventData;
    if (EpollCtl(fd, EPOLL_CTL_ADD, ev) != RET_OK) {
        free(eventData);
        eventData = nullptr;
        ev.data.ptr = nullptr;
        return RET_ERR;
    }
    return RET_OK;
}

// 线程内,有事件发生时执行
void DeviceStatusService::OnThread()
{
    SetThreadName(std::string("os_ds_service"));
    uint64_t tid = GetThisThreadId();
    delegateTasks_.SetWorkerThreadId(tid);
    EnableDevMgr(MAX_N_RETRIES);
    while (state_ == ServiceRunningState::STATE_RUNNING) {
        struct epoll_event ev[MAX_EVENT_SIZE] {};
        int32_t count = EpollWait(MAX_EVENT_SIZE, -1, ev[0]);
        for (int32_t i = 0; i < count && state_ == ServiceRunningState::STATE_RUNNING; i++) {
            auto epollEvent = reinterpret_cast<device_status_epoll_event*>(ev.data.ptr);
            if (epollEvent->event_type == EPOLL_EVENT_SOCKET) {
                OnEpollEvent(ev);
            } else if (epollEvent->event_type == EPOLL_EVENT_ETASK) {
                OnDelegateTask(ev);
            } else if (epollEvent->event_type == EPOLL_EVENT_TIMER) {
                OnTimeout(ev);
            } else if (epollEvent->event_type == EPOLL_EVENT_DEVICE_MGR) {
                OnDeviceMgr(ev);
            } else {
            }
        }
    }
}
```

// 事件发生后,read 读取、调度处理

```cpp
void DeviceStatusService::OnDelegateTask(const struct epoll_event &ev)
{
    if ((ev.events & EPOLLIN) == 0) {
        return;
    }
    DelegateTasks::TaskData data {};
    ssize_t res = read(delegateTasks_.GetReadFd(), &data, sizeof(data));
    delegateTasks_.ProcessTasks();
}
void DeviceStatusService::OnTimeout(const struct epoll_event &ev)
{
    if ((ev.events & EPOLLIN) == EPOLLIN) {
        uint64_t expiration {};
        ssize_t ret = read(timerMgr_.GetTimerFd(), &expiration, sizeof(expiration));
        timerMgr_.ProcessTimers();
    } else if ((ev.events & (EPOLLHUP | EPOLLERR)) != 0) {
    }
}
void DeviceStatusService::OnDeviceMgr(const struct epoll_event &ev)
{
    if ((ev.events & EPOLLIN) == EPOLLIN) {
        devMgr_.Dispatch(ev);
    } else if ((ev.events & (EPOLLHUP | EPOLLERR)) != 0) {
    }
}

```

// 当取消监控或者正常结束时应删除监控对象

```

void DeviceStatusService:isableDevMgr()
{
    DelEpoll(EPOLL_EVENT_DEVICE_MGR, devMgr_.GetFd());
    devMgr_.Disable();
}

int32_t DeviceStatusService:elEpoll(EpollEventType type, int32_t fd)
{
    struct epoll_event ev {};
    if (EpollCtl(fd, EPOLL_CTL_DEL, ev) != RET_OK) {
        return RET_ERR;
    }
    return RET_OK;
}
```

## epoll 调用的又一个入口,在 StreamServer, 针对EPOLL_EVENT_SOCKET监控对象的

```cpp
// stream_server.h
enum EpollEventType {
    EPOLL_EVENT_BEGIN = 0,
    EPOLL_EVENT_INPUT = EPOLL_EVENT_BEGIN,
    EPOLL_EVENT_SOCKET,
    EPOLL_EVENT_ETASK,
    EPOLL_EVENT_TIMER,
    EPOLL_EVENT_DEVICE_MGR,
    EPOLL_EVENT_END
};

    virtual int32_t AddEpoll(EpollEventType type, int32_t fd) = 0;
    void ReleaseSession(int32_t fd, epoll_event &ev);
    void OnEpollRecv(int32_t fd, epoll_event &ev);
    void OnEpollEvent(epoll_event &ev);

// stream_server.cpp
void StreamServer::UdsStop()
{
    if (epollFd_ != -1) {
        if (close(epollFd_) < 0) {
        }
        epollFd_ = -1;
    }
}
int32_t StreamServer::AddSocketPairInfo(const std::string &programName, int32_t moduleType, int32_t uid, int32_t pid,
    int32_t &serverFd, int32_t &toReturnClientFd, int32_t &tokenType)
{
    int32_t sockFds[2] = { -1 };
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockFds) != 0) {
        return RET_ERR;
    }
    serverFd = sockFds[0];
    toReturnClientFd = sockFds[1];
    // ...
    if (AddEpoll(EPOLL_EVENT_SOCKET, serverFd) != RET_OK) {
        return CloseFd(serverFd, toReturnClientFd);
    }
    OnConnected(sess);
    return RET_OK;
}

void StreamServer::ReleaseSession(int32_t fd, epoll_event &ev)
{
    //...
    auto DeviceStatusService = DeviceStatus:elayedSpSingleton<DeviceStatus:eviceStatusService>::GetInstance();
    DeviceStatusService->DelEpoll(EPOLL_EVENT_SOCKET, fd);
    if (close(fd) < 0) {
    }
}
void StreamServer::OnEpollRecv(int32_t fd, epoll_event &ev)
{
    if (fd < 0) {
        return;
    }
    auto& buf = circleBufs_[fd];
    char szBuf[MAX_PACKET_BUF_SIZE] = { 0 };
    for (int32_t i = 0; i < MAX_RECV_LIMIT; i++) {
        ssize_t size = recv(fd, szBuf, MAX_PACKET_BUF_SIZE, MSG_DONTWAIT | MSG_NOSIGNAL);
       //...
    }
}

void StreamServer::OnEpollEvent(epoll_event &ev)
{
    int32_t fd = *static_cast<int32_t*>(ev.data.ptr);
    if (fd < 0) {
        return;
    }
    if ((ev.events & EPOLLERR) || (ev.events & EPOLLHUP)) {
        ReleaseSession(fd, ev);
    } else if (ev.events & EPOLLIN) {
        OnEpollRecv(fd, ev);
    }
}
```

## epoll 调用的出口,在 StreamSocket

```cpp
// stream_socket.h
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/socket.h>

    int32_t EpollCreate();
    int32_t EpollCtl(int32_t fd, int32_t op, struct epoll_event &event);
    int32_t EpollWait(int32_t maxevents, int32_t timeout, struct epoll_event &events);
    void EpollClose();
    int32_t epollFd_ { -1 };

// stream_socket.cpp
StreamSocket::~StreamSocket()
{
    EpollClose();
}
int32_t StreamSocket::EpollCreate()
{
    epollFd_ = ::epoll_create1(EPOLL_CLOEXEC);
    return RET_OK;
}
int32_t StreamSocket::EpollCtl(int32_t fd, int32_t op, struct epoll_event &event)
{
    if (::epoll_ctl(epollFd_, op, fd, &event) != 0) {
        return RET_ERR;
    }
    return RET_OK;
}
int32_t StreamSocket::EpollWait(int32_t maxevents, int32_t timeout, struct epoll_event &events)
{
    return epoll_wait(epollFd_, &events, maxevents, timeout);
}

void StreamSocket::EpollClose()
{
    if (epollFd_ >= 0) {
        if (close(epollFd_) < 0) {
        }
        epollFd_ = -1;
    }
}
```

# 参考文献、链接

* epoll_event结构体定义:https://blog.csdn.net/qq_41413211/article/details/129310969
* Epoll的本质(原理): https://blog.csdn.net/bandaoyu/article/details/106851848
* 深入理解 Epoll: https://zhuanlan.zhihu.com/p/93609693
* epoll相关函数的调用及说明:https://blog.csdn.net/qq_42474104/article/details/124917220
[/md]




欢迎光临 OpenHarmony开发者论坛 (https://forums.openharmony.cn/) Powered by Discuz! X3.5