概述
在我的博客中我以前已经用 Python 描述过 Epoll 了,但是在 Go 语言中还没有尝试过,很大一个原因是 Go 里面自带 Goroutine,但是 EPoll 仍然有更高的性能,所以这里我就尝试一下。
我之前讲述的 Python 代码版本为:select,poll 和 epoll 概述
一些细节
选择边缘触发还是水平触发
先来看一下两种触发方式的区别:
- 边缘触发:只会在有状态变化时被
epoll_wait
响应,例如收到了 1K 的数据 - 水平触发:只要缓冲区有数据都会一直触发
epoll_wait
响应
区别在于,如果缓冲区有 1K 的数据,如果你每次只读 100 个 byte 的数据,那么对于不同的触发方式效果是不一样的:
- 边缘触发:你读取了 100 bytes,剩下的 924 bytes 不会继续触发 epoll,只能等待下次有新的数据进来,如果没有新的数据,这些数据就无法读取了;
- 水平触发:你读取了 100 bytes,缓冲区还有 924 bytes,还会接着触发
epoll_wait
,下次还可以接着读
这里可以看到,各有优缺点:
- 边缘触发:epoll 的响应次数少,CPU 切换次数少(内核态和应用态切换少),但是需要你处理的事情多,你需要一次性将数据都读完,用不用得着先不管;
- 水平触发:epoll 的响应次数多,只要缓冲区还有数据,就会触发,CPU 切换次数多(内核态和用户态切换频繁),但是应用处理逻辑少,按需读数据即可;
所以我的观点是:
- 在高性能场景下,推荐使用 边缘触发 + 应用缓存 的方式。
EPOLL 如何检测客户端关闭?
- 如果是 2.6.17 以及之后内核的 Linux 中,可以关注 EPOLLRDHUP 事件,如果遇到这个事件,则可以判断对端关闭连接;
- 如果是 2.6.17 之前的内核或者不想用 EPOLLRDHUP 事件,EPOLL 在对端关闭时还会触发 EPOLLIN 和 EPOLLOUT 事件
- 如果 EPOLLIN read 返回 0,表示对端正常调用 close 关闭连接;
- 如果 EPOLLIN read 返回 -1,并且 errno 不是 EAGAIN 和 EINTR 的话,那就是异常断开连接;
多线程处理要点
- 惊群处理用:EPOLLEXCLUSIVE
- 如果两个线程同时调用
epoll_wait
,当有一个新连接进来时 - 线程 A accept 了线程 B 就没必要处理了
- 如果两个线程同时调用
- 多线程读取控制:EPOLLONESHOT
- 如果使用边缘触发,并且多个线程同时
epoll_wait
一个 fd,那么可能 - 线程 A 读取了一半的数据,然后有新数据进来,线程 B 也被唤醒,然后读取了另外一半的数据
- ONESHOT:监听到一次事件后就将对应的描述符从监听集合中移除,这样线程 B 也就不会读取到新的数据了
- 如果使用边缘触发,并且多个线程同时
备忘
常见的事件
常量 | 功能 | 描述 |
---|---|---|
EPOLLIN | 有连接到达;或者连接可读 | |
EPOLLOUT | 连接可写 |
代码
下面就直接贴一下 Go 版本:
package main
import (
"flag"
"fmt"
"log"
"net"
"strconv"
"syscall"
"epool-network-try/pkg/workerpool"
"github.com/liuliqiang/log4go"
"github.com/pkg/errors"
)
const (
EPOLLET = 1 << 31
EPOLLIN_SET = syscall.EPOLLIN | syscall.EPOLLHUP
)
var (
host string = "0.0.0.0"
port int = 3333
maxEpollEvents = 32
// readBufferMap = map[int]bytes.Buffer{} // fd -> buffer
// writeBufferMap = map[int]bytes.Buffer{} // fd- > buffer
wp = workerpool.NewWorkerPool()
)
type WorkFunc func()
func main() {
flag.StringVar(&host, "host", host, "listen host")
flag.IntVar(&port, "port", port, "listen port")
flag.IntVar(&maxEpollEvents, "max.events", maxEpollEvents, "max epoll events")
flag.Parse()
log4go.SetFlags(log.LstdFlags | log.Lshortfile)
epfd, err := syscall.EpollCreate1(0) // @remind : need to create a epoll
if err != nil {
panic(errors.Wrap(err, "epoll_create"))
}
defer syscall.Close(epfd)
fd, err := newTcpConn(host, port)
if err != nil {
panic(err)
}
defer syscall.Close(fd)
if err := addFDToEpoll(epfd, fd); err != nil {
panic(err)
}
log4go.Info("Ready to listen at %s:%d", host, port)
var events [1024]syscall.EpollEvent
for {
nevents, err := syscall.EpollWait(epfd, events[:], -1)
if err != nil {
panic(errors.Wrap(err, "epoll_wait"))
}
for ev := 0; ev < nevents; ev++ {
log4go.Info("Get event [%d] from fd: %d", events[ev].Events, events[ev].Fd)
switch {
case int(events[ev].Fd) == fd:
connFd, _, err := syscall.Accept(fd) // get the new connection
if err != nil {
log4go.Error("Failed to accept: %w", err)
continue
}
syscall.SetNonblock(connFd, true)
addFDToEpoll(epfd, connFd)
case events[ev].Events&EPOLLIN_SET != 0:
wp.Submit(workerpool.WorkerFunc(newReadWorkFunc(epfd, int(events[ev].Fd))))
}
}
log4go.Info("Continue to wait event again.")
}
}
func newTcpConn(host string, port int) (int, error) {
fd, err := syscall.Socket(syscall.AF_INET, syscall.O_NONBLOCK|syscall.SOCK_STREAM, 0)
if err != nil {
return 0, errors.Wrap(err, "create socket")
}
if err = syscall.SetNonblock(fd, true); err != nil { // @remind have to set nonblock before bind????, I don't know why?
return 0, errors.Wrap(err, "set non block")
}
addr := syscall.SockaddrInet4{
Port: port,
}
copy(addr.Addr[:], net.ParseIP(host).To4()) //@remind :Addr is an array, it's not easy to assign an array
syscall.Bind(fd, &addr)
// @remind :what's the 10 means
// 10 is the backlog, the max number of pending connections the queue will hold in the kernel
syscall.Listen(fd, 10)
return fd, nil
}
func addFDToEpoll(epfd, fd int) error {
// syscall.SetNonblock(fd, true)
// watch event:
// - data in
// - connection break
var event syscall.EpollEvent
event.Events = EPOLLIN_SET | EPOLLET
event.Fd = int32(fd)
// @remind add new connection into epoll, need to set fd status to nonblock
if err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return errors.Wrap(err, "add fd with epoll_ctl_"+strconv.Itoa(fd))
}
return nil
}
func delFDFromEpoll(epfd, fd int) error {
if err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, fd, nil); err != nil {
return errors.Wrap(err, "remove fd with epoll_ctl_"+strconv.Itoa(fd))
}
return nil
}
func newReadWorkFunc(epfd, fd int) WorkFunc {
return func() {
var totalBytes = 0
var buf [32 * 1024]byte
for {
nbytes, err := syscall.Read(fd, buf[:])
totalBytes += nbytes
if err != nil {
if err == syscall.EAGAIN {
return
}
log4go.Warn("Failed to read fd[%d]: %v", fd, err)
return
}
if nbytes <= 0 {
if totalBytes <= 0 {
delFDFromEpoll(epfd, fd)
}
return
}
if nbytes > 0 {
// @remind change to epoll write?
// wBuf, ok := writeBufferMap[fd]
// if !ok {
// wBuf = *bytes.NewBuffer(make([]byte, 0, 1024))
// writeBufferMap[fd] = wBuf
// }
// if n, err := wBuf.Write(buf[:nbytes]); err != nil || n != nbytes {
// log4go.Warn("Failed to save write buffer[%d/%d]: %w", n, nbytes, err)
// break
// }
fmt.Printf(">>> %s", buf)
if n, err := syscall.Write(fd, buf[:nbytes]); err != nil || n != nbytes {
log4go.Warn("Failed to write socket[%d/%d]: %w", n, nbytes, err)
}
fmt.Printf("<<< %s", buf)
}
}
}
}