上一篇文章已经介绍了IO 多路复用的含义,这里着重看一下linux系统多路复用实现的API
select
select 使用文档在:select(2) - Linux manual page
接口定义如下:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
// nfds readfds、writefds、exceptfds 中编号最大的那个文件描述符+1
// readfds监听读操作的文件描述符列表
// writefds 是监听写操作的文件描述符列表
// exceptfds 是监听出现异常的文件描述符列表
// timeout 是 select 最大阻塞时间长度,配置的最小时间精度是毫秒
select 返回条件
- 有文件描述符就绪,可读、可写或异常
- 线程被 interrupt
- 已经阻塞 timeout 时间
select 存在的问题
- fd_set 是一个1024长度的bitmap,也就是说接受的文件描述符为0~1023,是一个有限大小的数组
- 在select函数返回后,用户需要遍历所有的描述符列表集合,看看哪个文件描述符列表处于就绪状态,做完IO操作后还需要清理文件描述符的状态
- 同理在内核中也需要遍历三个文件描述符列表为文件描述符注册监听,设置处理函数等;在select返回之前仍然需要清除处理函数
- 有惊群效应(同一个文件描述符被多个进程的select函数注册后,一旦ready会惊动所有进程或者线程)
- 无法动态添加文件描述符(需要等待select返回后才能添加
示例代码
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <wait.h>
#include <signal.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/time.h>
#include <unistd.h>
#define MAXBUF 256
int main()
{
char buffer[MAXBUF];
int fds[5]; // 长度为5的fds数组
struct sockaddr_in addr;
struct sockaddr_in client;
int addrlen, n,i,max=0;;
int sockfd, commfd;
fd_set rset;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
memset(&addr, 0, sizeof (addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(2000);
addr.sin_addr.s_addr = INADDR_ANY;
bind(sockfd,(struct sockaddr*)&addr ,sizeof(addr));
listen (sockfd, 5);
for (i=0;i<5;i++)
{
memset(&client, 0, sizeof (client));
addrlen = sizeof(client);
// 创建文件描述符
fds[i] = accept(sockfd,(struct sockaddr*)&client, &addrlen);
if(fds[i] > max)
// fds 的最大值
max = fds[i];
}
// 开始
while(1){
FD_ZERO(&rset); // 对rset进行重新置位
for (i = 0; i< 5; i++ ) {
// 构造bitset
FD_SET(fds[i],&rset);
}
puts("round again");
// 文件描述符最大值+1
// 读文件描述符bitMap集合
// 写和异常不传默认为null,超时时间也可以不传有默认值
select(max+1, &rset, NULL, NULL, NULL);
for(i=0;i<5;i++) {
// 假如fd可读
if (FD_ISSET(fds[i], &rset)){
memset(buffer,0,MAXBUF); // 创建用户缓冲区
read(fds[i], buffer, MAXBUF); // 读数据
puts(buffer); // 进行业务处理
}
}
}
return 0;
}
poll
poll的使用文档在 poll(2) — Linux manual page
接口定义如下:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// fds是一个pollfd数组
// nfds 是 fds 数组的长度,struct pollfd 定义如下
// timeout函数的超时时间
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 关注的事件类型,多个时 与操作 */
short revents; /* 响应的事件 */
};
poll 的返回条件与 select 一样
poll与select函数改进的点
- 改进了select的bitset,长度是一个数组,没有上限
- events和revents单独分开,所以如果event没有变化就可以重用fds
存在的问题
- ready返回后需要重新遍历fds寻找发生的事件
- 同时需要清理revents(置位)
- 同样存在惊群问题
- 同样存在无法动态修改描述符的问题
示例代码
for (i=0;i<5;i++)
{
memset(&client, 0, sizeof (client));
addrlen = sizeof(client);
pollfds[i].fd = accept(sockfd,(struct sockaddr*)&client, &addrlen);
pollfds[i].events = POLLIN;
}
sleep(1);
while(1){
puts("round again");
// 调用poll函数 pollfds fds数组,5为fds数组长度
poll(pollfds, 5, 50000);
for(i=0;i<5;i++) {
// 遍历数组的revets
if (pollfds[i].revents & POLLIN){
pollfds[i].revents = 0; // 对revents进行置位
memset(buffer,0,MAXBUF); // 用户缓冲区
read(pollfds[i].fd, buffer, MAXBUF); // 读取数据
puts(buffer); // 处理数据
}
}
}
epoll
使用文档:epoll(7) — Linux manual page
接口定义如下:
int epoll_create(int size); // 创建epoll的描述符
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // 注册fd及其事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// 其中 epoll_event 定义如下
typedef union epoll_data {
void *ptr;
int fd; // 文件描述符
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
epoll的主要流程
- 使用epoll_create创建一个epoll描述符
- 使用epoll_ctl注册fd及其需要监听的事件
- 调用epoll_wait 等着被监听的描述符Ready,有fd数据到来时返回ready的数量,遍历 Ready 的描述符,根据 Ready 的事件类型处理事件
- 使用epoll_ctl可以移除不需要监听的fd文件描述符
epoll优点
- 解决了poll函数每次遍历所有文件描述符的问题
- 每次执行 epoll_wait 后用户侧不用重新配置监听
- 通过 epoll_ctl 支持动态增减监听的描述符
epoll缺点
- epoll_ctl 是个系统调用,没有提供批量操作的方法
- 依然存在惊群问题
示例代码
struct epoll_event events[5];
int epfd = epoll_create(10); // 创建一个epoll描述符
...
...
for (i=0;i<5;i++)
{
static struct epoll_event ev;
memset(&client, 0, sizeof (client));
addrlen = sizeof(client);
ev.data.fd = accept(sockfd,(struct sockaddr*)&client, &addrlen);
ev.events = EPOLLIN;
// 依次注册fd文件描述符及其监听的事件
epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
}
while(1){
puts("round again");
// 调用epoll方法,返回ready的fd文件描述符个数
nfds = epoll_wait(epfd, events, 5, 10000);
for(i=0;i<nfds;i++) { // 遍历前ndfs个fd
memset(buffer,0,MAXBUF); // 用户态缓冲区
read(events[i].data.fd, buffer, MAXBUF); // 读取数据
puts(buffer); // 处理数据
}
}
Epoll的 Eage-Trigger 和 Level-Trigger
epoll的epoll_wait方法有两种触发模式:边缘触发(Eage-Trigger)和水平触发(Level-Trigger);从使用角度来说这两种主要有以下区别:
- 对于ET(Eage-Trigger)模式 ,当一个FD文件描述符Ready之后,就需要操作这个FD,如果不处理下次epoll_wait不会响应此事件通知
- 对于LT(Level-Trigger)模式,当一个FD文件描述符Ready之后,不需要立即处理这个FD,如果不处理下次epoll_wait仍然会响应此事件通知
默认情况下使用LT模式,支持block和no-block socket,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的;ET(edge-triggered)是高速工作方式,只支持no-block socket,当描述符就绪时内核只会通知一次,一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会再次发送通知
ET模式减少了触发epoll事件的次数,从而效率有所提高;但使用此模式时必须要使用no block的套接字,而避免由于一个文件句柄的阻塞读、写把处理多个任务的任务造成饥饿
如何解决惊群效应
当使用单个线程监听epoll_wait 方法返回是没有惊群效应的,当引入多线程监听epoll_wait 时就可能会遇到惊群的问题。
解决惊群问题有两种方式:
- 采用单线程epoll_wait(监听到事件后分发协程处理)
- 采用定点唤醒某个线程
关于为什么用协程的可以看这里:协程和IO多路复用更配哦
Java中的Selector
Java中nio包中提供了一个Selector类,用于跨平台的IO多路复用。在 BSD 系统上它背后对应的就是 Kqueue,在 Windows 上对应的是 Select,在 Linux 上对应的是 Level Trigger 的 Epoll。(Linux 上为什么是 Level Trigger?是为了抽象统一,因为Windows的Select是Level Trigger的)
为何netty单独实现了linux的EpollEventLoop 而不只是提供 NioEventLoop ? 因为netty想支持 Edge Trigger和一些独立的参数
Netty's epoll transport uses epoll edge-triggered while java's nio library uses level-triggered. Beside this the epoll transport expose configuration options that are not present with java's nio like TCP_CORK, SO_REUSEPORT and more.
nio - Why native epoll support is introduced in Netty? - Stack Overflow
使用示例:
- 使用 Selector.open() 创建一个 Selector
- 使用 ServerSocketChannel.open() 创建SelectableChannel,可以理解为Socket,配置Channel为非阻塞
- 使用 ServerSocketChannel 的register方法将channel注册到selector上,配置需要响应的事件
- 调用 Selector 上的 select() 等待有 Channel 上有 Event 产生
- 当 select() 返回并且大于0,说明有 Channel 有 Event 产生,通过 Selector 获取 SelectionKey,处理事件
- 从 select() 返回的 Iterator 中移除处理完的 SelectionKey
示例代码
public NioServer(int port) throws Exception {
selector = Selector.open(); // 打开一个选择器 选择器能够监听是否有感兴趣的事情发生
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false); // 非阻塞
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册感兴趣的事件
System.out.println("--server --- start----");
}
// 启动nio服务的监听处理
public void start() throws Exception {
while (true) { //
int selects = selector.select(); // 没有敢兴趣的事件 阻塞 --》有那么selects > 0
if (selects <= 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取已经选择器集合
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
handle(selectionKey); //处理 ,路由操作
it.remove();
}
}
}
windows的IOCP
IOCP 是 Windows 下异步 IO 的接口,和多路复用不同,Java 下用到 IOCP 的是 NIO.2 的 AIO,AsynchronousChannelGroup (Java Platform SE 8 ),也就是说 AIO 在 Windows 上才对应着 IOCP,而且IOCP比较复杂,这里暂时不深入了解IOCP