select,poll,epoll 都是IO多路复用的机制。I/O多路复用就是通过一种机制,可以监视多个描述符(socket, file, tunnel),一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

select 有3个缺点:

  1. 连接数受限,可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
  2. 查找配对速度慢,调用select()会对所有socket进行一次线性扫描。
  3. 数据由内核拷贝到用户态

!! poll改善了第一个缺点

!!!epoll改了三个缺点.

进一步解析

两种触发

select实现

select的调用过程如下所示:

select_activity.png

  1. 使用 copy_from_user 从用户空间拷贝 fd_set 到内核空间
  2. 注册回调函数 __pollwait
  3. 遍历所有 fd,调用其对应的 poll 方法(对于 socket,这个 poll 方法是 sock_pollsock_poll 根据情况会调用到 tcp_poll , udp_poll 或者 datagram_poll
  4. tcp_poll 为例,其核心实现就是 __pollwait,也就是上面注册的回调函数。
  5. __pollwait 的主要工作就是把 current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于 tcp_poll 来说,其等待队列是 sk->sk_sleep (注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时 current 便被唤醒了。
  6. poll 方法返回时会返回一个描述读写操作是否就绪的 mask 掩码,根据这个 mask 掩码给 fd_set 赋值。
  7. 如果遍历完所有的 fd,还没有返回一个可读写的 mask 掩码,则会调用 schedule_timeout 是调用 select 的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,则调用 select 的进程会重新被唤醒获得CPU,进而重新遍历 fd,判断有没有就绪的 fd
  8. fd_set 从内核空间拷贝到用户空间。

poll实现

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 selectfd_set 结构,其他的都差不多。

epoll

相比于 selectpollepoll 的改进可以归结为 2 点:

  1. epollpoll 一样只告知那些就绪的文件描述符,而且当我们调用 epoll_wait() 获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去 epoll 指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销

  2. 另一个本质的改进在于 epoll 采用基于事件的就绪通知方式。在 select/poll 中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而 epoll 事先通过 epoll_ctl() 来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似 callback 的回调机制,迅速激活这个文件描述符,当进程调用 epoll_wait() 时便得到通知

epoll 既然是对 selectpoll 的改进,就应该能避免上述的三个缺点。那 epoll 都是怎么解决的呢?在此之前,我们先看一下 epollselectpoll 的调用接口上的不同,selectpoll 都只提供了一个函数—— select 或者 poll 函数。而epoll 提供了三个函数,epoll_create, epoll_ctlepoll_waitepoll_create 是创建一个 epoll 句柄;epoll_ctl 是注册要监听的事件类型;epoll_wait 则是等待事件的产生。

  1. 对于第一个缺点,epoll 的解决方案在 epoll_ctl 函数中。每次注册新的事件到 epoll 句柄中时(在 epoll_ctl 中指定 EPOLL_CTL_ADD),会把所有的 fd 拷贝进内核,而不是在 epoll_wait 的时候重复拷贝。epoll 保证了每个 fd 在整个过程中只会拷贝一次。

  2. 对于第二个缺点,epoll 的解决方案不像 selectpoll 一样每次都把 current 轮流加入 fd 对应的设备等待队列中,而只在 epoll_ctl 时把 current 挂一遍(这一遍必不可少)并为每个 fd 指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait 的工作实际上就是在这个就绪链表中查看有没有就绪的 fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和 select 实现中的第7步是类似的)。

  3. 对于第三个缺点,epoll 没有这个限制,它所支持的 fd 上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以 cat /proc/sys/fs/file-max 察看,一般来说这个数目和系统内存关系很大。

总结:

  1. selectpoll 实现需要自己不断轮询所有 fd 集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而 epoll 其实也需要调用 epoll_wait 不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪 fd 放入就绪链表中,并唤醒在 epoll_wait 中进入睡眠的进程。虽然都要睡眠和交替,但是 selectpoll 在“醒着”的时候要遍历整个 fd 集合,而 epoll 在“醒着”的时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间。这就是回调机制带来的性能提升。

  2. selectpoll 每次调用都要把 fd 集合从用户态往内核态拷贝一次,并且要把 current 往设备等待队列中挂一次,而 epoll 只要一次拷贝,而且把 current 往等待队列上挂也只挂一次(在 epoll_wait 的开始,注意这里的等待队列并不是设备等待队列,只是一个 epoll 内部定义的等待队列)。这也能节省不少的开销。

python 代码示例

这里为了来个实践,就以 python 语言为例,给个示例,因为 selectpoll 差不多,这里就只给 select 的例子了,然后 epoll 也给一个,需要注意的是 epoll 只能在 Linux 的机器上使用,所以,重复实验的时候需要注意到这一点。

select_server.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python
# encoding: utf-8
import sys
import cPickle
import struct
import socket
import select
import logging
from threading import Event


host = '0.0.0.0'
port = 12007


class SelectServer(object):
    def __init__(self):
        self.stop = Event()
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind((host, port))
        self.server.listen(10)

    def send(self, channel, *args):
        buffer = cPickle.dumps(args)
        value = socket.htonl(len(buffer))
        size = struct.pack('L', value)
        channel.send(size)
        channel.send(buffer)

    def receive(self, channel):
        size = struct.calcsize('L')
        size = channel.recv(size)
        try:
            size = socket.ntohl(struct.unpack("L", size)[0])
        except struct.error, e:
            logging.error(e)
            return ''
        buf = ''
        while len(buf) < size:
            buf += channel.recv(size-len(buf))
        return cPickle.loads(buf)[0]

    def stop_server(self):
        for output in self.outputs:
            output.close()
        self.stop.set()

    def run(self):
        inputs = [self.server, sys.stdin]
        self.outputs = []
        while not self.stop.is_set():
            try:
                readable, writeable, exceptional = select.select(
                    inputs, self.outputs, [])
            except select.error, e:
                logging.error(e)
                break
            for sock in readable:
                if sock == self.server:
                    client, address = self.server.accept()
                    text = self.receive(client)
                    self.send(client, text)
                    print("receive {} from {}".format(text, address))
                    self.outputs.append(client)
                    inputs.append(client)
                elif sock == sys.stdin:
                    junk = sys.stdin.readline()
                    if junk.strip() == 'stop':
                        self.stop_server()
                else:
                    try:
                        data = self.receive(sock)
                        if data:
                            print("receive {} from {}".format(data, sock))
                            self.send(sock, data)
                        else:
                            sock.close()
                            inputs.remove(sock)
                            self.outputs.remove(sock)
                    except socket.error, e:
                        logging.error(e)
                        inputs.remove(sock)
                        self.outputs.remove(sock)
        self.server.close()

if __name__ == "__main__":
    SelectServer().run()

select_client.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python
# encoding: utf-8
import time
import logging
import random
import struct
import cPickle
import socket


host = '0.0.0.0'
port = 12007

class SelectClient(object):
    def __init__(self):
        self.clients = []
        self.connections = 10

    def send(self, channel, *args):
        buffer = cPickle.dumps(args)
        value = socket.htonl(len(buffer))
        size = struct.pack('L', value)
        channel.send(size)
        channel.send(buffer)

    def receive(self, channel):
        size = struct.calcsize('L')
        size = channel.recv(size)
        try:
            size = socket.socket.ntohl(struct.unpack("L", size)[0])
        except struct.error, e:
            logging.error(e)
            return ''
        buf = ''
        while len(buf) < size:
            buf += channel.recv(size-len(buf))
        return cPickle.loads(buf)[0]

    def run(self):
        for i in xrange(self.connections):
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client.connect((host, port))
            self.clients.append(client)

        for i in xrange(100):
            idx = random.randint(0, 9)
            self.send(self.clients[idx], 'message {}'.format(i))

        time.sleep(10)

        for client in self.clients:
            client.close()

if __name__ == '__main__':
    SelectClient().run()

这一对 clientserver 比较简单,就是服务器从客户端接收消息,然后再把消息打印出来,而客户端就 10 个客户端随机选择出来发送消息。如果不使用 select 的话,我们以前的通用操作方法可能是使用 threading 来进行多条线程监控多个 client,但是,这里使用 select 的话可以很简单得处理很多 client

epoll_server.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#!/usr/bin/env python
# encoding: utf-8
import socket
import select
from threading import Event

host = "0.0.0.0"
port = 12008

class EpollServer(object):
    def __init__(self):
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind((host, port))
        self.server.listen(1)
        self.server.setblocking(0)
        self.server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        self.epoll = select.epoll()
        self.epoll.register(self.server.fileno(), select.EPOLLIN)
        self.stop = Event()

    def run(self):
        try:
            conns = {}
            reqs = {}
            resps = {}
            while not self.stop.is_set():
                events = self.epoll.poll(1)
                for fileno, event in events:
                    if fileno == self.server.fileno():
                        conn, addr = self.server.accept()
                        conn.setblocking(0)
                        self.epoll.register(conn.fileno(), select.EPOLLIN)
                        conns[conn.fileno()] = conn
                        reqs[conn.fileno()] = b''
                        resps[conn.fileno()] = "hello world"
                    elif event & select.EPOLLIN:
                        data = conns[fileno].recv(1024)
                        print "receive data {} from {}".format(data, fileno)
                        reqs[fileno] += data
                        if len(data) < 1024:
                            self.epoll.modify(fileno, select.EPOLLOUT)
                    elif event & select.EPOLLOUT:
                        bytes_writen = conns[fileno].send(resps[fileno])
                        resps[fileno] = resps[fileno][bytes_writen:]
                        if len(resps[fileno]) == 0:
                            self.epoll.modify(fileno, select.EPOLLIN)
                    elif event & select.EPOLLHUP:
                        self.epoll.unregister(fileno)
                        conns[fileno].close()
                        del conns[fileno]
        finally:
            self.epoll.unregister(self.server.fileno())
            self.epoll.close()
            self.server.close()

if __name__ == "__main__":
    server = EpollServer()
    server.run()

epoll_client.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/env python
# encoding: utf-8
import time
import random
import socket


host = '0.0.0.0'
port = 12008

class SelectClient(object):
    def __init__(self):
        self.clients = []
        self.connections = 10

    def send(self, channel, data):
        channel.send(data)

    def receive(self, channel):
        pass

    def run(self):
        for i in xrange(self.connections):
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client.connect((host, port))
            self.clients.append(client)

        for i in xrange(100):
            idx = random.randint(0, 9)
            self.send(self.clients[idx], 'message {}'.format(i))

        time.sleep(3)

        for client in self.clients:
            client.close()

if __name__ == '__main__':
    SelectClient().run()

这里的 epollselect 的使用方式就不一样了,这里不再是传递一个 fd 列表了,而是给每个 fd register 一个关注的事件,是入事件还是出事件,然后根据 fdevent 来处理。需要注意的是,这里的 Python 代码和前面所说的底层实现有一点差别,例如没有 epoll_create 函数之类的,反之,而是 epoll.registerepoll.poll 之类的。

Reference