概述
select,poll,epoll 都是 IO 多路复用的机制。I/O 多路复用就是通过一种机制,可以监视多个描述符(socket, file, tunnel),一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
select 有 3 个缺点:
- 连接数受限,可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
- 查找配对速度慢,调用select()会对所有socket进行一次线性扫描。
- 数据由内核拷贝到用户态
!! poll改善了第一个缺点
!!!epoll改了三个缺点.
进一步解析
两种触发
水平触发(Level Triggered)
select() 和 poll() 将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用 select() 和 poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息
边缘触发(Edge Triggered)
只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发
select实现
select的调用过程如下所示:
- 使用
copy_from_user
从用户空间拷贝fd_set
到内核空间 - 注册回调函数
__pollwait
- 遍历所有
fd
,调用其对应的poll
方法(对于 socket,这个poll
方法是sock_poll
,sock_poll
根据情况会调用到tcp_poll
,udp_poll
或者datagram_poll
) - 以
tcp_poll
为例,其核心实现就是__pollwait
,也就是上面注册的回调函数。 __pollwait
的主要工作就是把 current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll
来说,其等待队列是 sk->sk_sleep (注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时 current 便被唤醒了。poll
方法返回时会返回一个描述读写操作是否就绪的 mask 掩码,根据这个 mask 掩码给fd_set
赋值。- 如果遍历完所有的
fd
,还没有返回一个可读写的 mask 掩码,则会调用 schedule_timeout 是调用select
的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,则调用select
的进程会重新被唤醒获得CPU,进而重新遍历fd
,判断有没有就绪的fd
。 - 把
fd_set
从内核空间拷贝到用户空间。
poll实现
poll
的实现和 select
非常相似,只是描述 fd
集合的方式不同,poll
使用 pollfd
结构而不是 select
的 fd_set
结构,其他的都差不多。
epoll
相比于 select 和 poll,epoll 的改进可以归结为 2 点:
epoll 和 poll 一样只告知那些就绪的文件描述符,而且当我们调用
epoll_wait()
获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去 epoll 指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销另一个本质的改进在于 epoll 采用基于事件的就绪通知方式。在 select/poll 中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而 epoll 事先通过
epoll_ctl()
来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似 callback 的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()
时便得到通知
epoll
既然是对 select
和 poll
的改进,就应该能避免上述的三个缺点。那 epoll
都是怎么解决的呢?在此之前,我们先看一下 epoll
和 select
和 poll
的调用接口上的不同,select 和 poll 都只提供了一个函数—— select
或者 poll
函数。而epoll 提供了三个函数,epoll_create
, epoll_ctl
和 epoll_wait
,epoll_create
是创建一个 epoll 句柄;epoll_ctl
是注册要监听的事件类型;epoll_wait
则是等待事件的产生。
对于第一个缺点,epoll 的解决方案在
epoll_ctl
函数中。每次注册新的事件到 epoll 句柄中时(在epoll_ctl
中指定EPOLL_CTL_ADD
),会把所有的 fd 拷贝进内核,而不是在epoll_wait
的时候重复拷贝。epoll 保证了每个 fd 在整个过程中只会拷贝一次。对于第二个缺点,epoll 的解决方案不像 select 或 poll 一样每次都把 current 轮流加入 fd 对应的设备等待队列中,而只在
epoll_ctl
时把 current 挂一遍(这一遍必不可少)并为每个 fd 指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait
的工作实际上就是在这个就绪链表中查看有没有就绪的 fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和 select 实现中的第7步是类似的)。对于第三个缺点,epoll 没有这个限制,它所支持的 fd 上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以
cat /proc/sys/fs/file-max
察看,一般来说这个数目和系统内存关系很大。
总结:
select,poll 实现需要自己不断轮询所有 fd 集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而 epoll 其实也需要调用
epoll_wait
不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪 fd 放入就绪链表中,并唤醒在epoll_wait
中进入睡眠的进程。虽然都要睡眠和交替,但是 select 和 poll 在“醒着”的时候要遍历整个 fd 集合,而 epoll 在“醒着”的时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间。这就是回调机制带来的性能提升。select,poll 每次调用都要把 fd 集合从用户态往内核态拷贝一次,并且要把 current 往设备等待队列中挂一次,而 epoll 只要一次拷贝,而且把 current 往等待队列上挂也只挂一次(在
epoll_wait
的开始,注意这里的等待队列并不是设备等待队列,只是一个 epoll 内部定义的等待队列)。这也能节省不少的开销。
代码示例
select
这里为了来个实践,就以 python 语言为例,给个示例,因为 select 和 poll 差不多,这里就只给 select 的例子了,然后 epoll 也给一个,需要注意的是 epoll 只能在 Linux 的机器上使用,所以,重复实验的时候需要注意到这一点。
select_server.py
#!/usr/bin/env python
# encoding: utf-8
import sys
import cPickle
import struct
import socket
import select
import logging
from threading import Event
host = '0.0.0.0'
port = 12007
class SelectServer(object):
def __init__(self):
self.stop = Event()
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((host, port))
self.server.listen(10)
def send(self, channel, *args):
buffer = cPickle.dumps(args)
value = socket.htonl(len(buffer))
size = struct.pack('L', value)
channel.send(size)
channel.send(buffer)
def receive(self, channel):
size = struct.calcsize('L')
size = channel.recv(size)
try:
size = socket.ntohl(struct.unpack("L", size)[0])
except struct.error, e:
logging.error(e)
return ''
buf = ''
while len(buf) < size:
buf += channel.recv(size-len(buf))
return cPickle.loads(buf)[0]
def stop_server(self):
for output in self.outputs:
output.close()
self.stop.set()
def run(self):
inputs = [self.server, sys.stdin]
self.outputs = []
while not self.stop.is_set():
try:
readable, writeable, exceptional = select.select(
inputs, self.outputs, [])
except select.error, e:
logging.error(e)
break
for sock in readable:
if sock == self.server:
client, address = self.server.accept()
text = self.receive(client)
self.send(client, text)
print("receive {} from {}".format(text, address))
self.outputs.append(client)
inputs.append(client)
elif sock == sys.stdin:
junk = sys.stdin.readline()
if junk.strip() == 'stop':
self.stop_server()
else:
try:
data = self.receive(sock)
if data:
print("receive {} from {}".format(data, sock))
self.send(sock, data)
else:
sock.close()
inputs.remove(sock)
self.outputs.remove(sock)
except socket.error, e:
logging.error(e)
inputs.remove(sock)
self.outputs.remove(sock)
self.server.close()
if __name__ == "__main__":
SelectServer().run()
select_client.py
#!/usr/bin/env python
# encoding: utf-8
import time
import logging
import random
import struct
import cPickle
import socket
host = '0.0.0.0'
port = 12007
class SelectClient(object):
def __init__(self):
self.clients = []
self.connections = 10
def send(self, channel, *args):
buffer = cPickle.dumps(args)
value = socket.htonl(len(buffer))
size = struct.pack('L', value)
channel.send(size)
channel.send(buffer)
def receive(self, channel):
size = struct.calcsize('L')
size = channel.recv(size)
try:
size = socket.socket.ntohl(struct.unpack("L", size)[0])
except struct.error, e:
logging.error(e)
return ''
buf = ''
while len(buf) < size:
buf += channel.recv(size-len(buf))
return cPickle.loads(buf)[0]
def run(self):
for i in xrange(self.connections):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
self.clients.append(client)
for i in xrange(100):
idx = random.randint(0, 9)
self.send(self.clients[idx], 'message {}'.format(i))
time.sleep(10)
for client in self.clients:
client.close()
if __name__ == '__main__':
SelectClient().run()
这一对 client 和 server 比较简单,就是服务器从客户端接收消息,然后再把消息打印出来,而客户端就 10 个客户端随机选择出来发送消息。如果不使用 select 的话,我们以前的通用操作方法可能是使用 threading 来进行多条线程监控多个 client,但是,这里使用 select 的话可以很简单得处理很多 client。
epoll_server.py
#!/usr/bin/env python
# encoding: utf-8
import socket
import select
from threading import Event
host = "0.0.0.0"
port = 12008
class EpollServer(object):
def __init__(self):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((host, port))
self.server.listen(1)
self.server.setblocking(0)
self.server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.epoll = select.epoll()
self.epoll.register(self.server.fileno(), select.EPOLLIN)
self.stop = Event()
def run(self):
try:
conns = {}
reqs = {}
resps = {}
while not self.stop.is_set():
events = self.epoll.poll(1)
for fileno, event in events:
if fileno == self.server.fileno():
conn, addr = self.server.accept()
conn.setblocking(0)
self.epoll.register(conn.fileno(), select.EPOLLIN)
conns[conn.fileno()] = conn
reqs[conn.fileno()] = b''
resps[conn.fileno()] = "hello world"
elif event & select.EPOLLIN:
data = conns[fileno].recv(1024)
print "receive data {} from {}".format(data, fileno)
reqs[fileno] += data
if len(data) < 1024:
self.epoll.modify(fileno, select.EPOLLOUT)
elif event & select.EPOLLOUT:
bytes_writen = conns[fileno].send(resps[fileno])
resps[fileno] = resps[fileno][bytes_writen:]
if len(resps[fileno]) == 0:
self.epoll.modify(fileno, select.EPOLLIN)
elif event & select.EPOLLHUP:
self.epoll.unregister(fileno)
conns[fileno].close()
del conns[fileno]
finally:
self.epoll.unregister(self.server.fileno())
self.epoll.close()
self.server.close()
if __name__ == "__main__":
server = EpollServer()
server.run()
epoll_client.py
#!/usr/bin/env python
# encoding: utf-8
import time
import random
import socket
host = '0.0.0.0'
port = 12008
class SelectClient(object):
def __init__(self):
self.clients = []
self.connections = 10
def send(self, channel, data):
channel.send(data)
def receive(self, channel):
pass
def run(self):
for i in xrange(self.connections):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
self.clients.append(client)
for i in xrange(100):
idx = random.randint(0, 9)
self.send(self.clients[idx], 'message {}'.format(i))
time.sleep(3)
for client in self.clients:
client.close()
if __name__ == '__main__':
SelectClient().run()
这里的 epoll 和 select 的使用方式就不一样了,这里不再是传递一个 fd 列表了,而是给每个 fd register
一个关注的事件,是入事件还是出事件,然后根据 fd 和 event 来处理。需要注意的是,这里的 Python 代码和前面所说的底层实现有一点差别,例如没有 epoll_create
函数之类的,反之,而是 epoll.register
和 epoll.poll
之类的。