概述

在我的博客中我以前已经用 Python 描述过 Epoll 了,但是在 Go 语言中还没有尝试过,很大一个原因是 Go 里面自带 Goroutine,但是 EPoll 仍然有更高的性能,所以这里我就尝试一下。

我之前讲述的 Python 代码版本为:select,poll 和 epoll 概述

一些细节

选择边缘触发还是水平触发

先来看一下两种触发方式的区别:

区别在于,如果缓冲区有 1K 的数据,如果你每次只读 100 个 byte 的数据,那么对于不同的触发方式效果是不一样的:

这里可以看到,各有优缺点:

所以我的观点是:

EPOLL 如何检测客户端关闭?

多线程处理要点

备忘

常见的事件

常量 功能 描述
EPOLLIN 有连接到达;或者连接可读
EPOLLOUT 连接可写

代码

下面就直接贴一下 Go 版本:

  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "net"
  7. "strconv"
  8. "syscall"
  9. "epool-network-try/pkg/workerpool"
  10. "github.com/liuliqiang/log4go"
  11. "github.com/pkg/errors"
  12. )
  13. const (
  14. EPOLLET = 1 << 31
  15. EPOLLIN_SET = syscall.EPOLLIN | syscall.EPOLLHUP
  16. )
  17. var (
  18. host string = "0.0.0.0"
  19. port int = 3333
  20. maxEpollEvents = 32
  21. // readBufferMap = map[int]bytes.Buffer{} // fd -> buffer
  22. // writeBufferMap = map[int]bytes.Buffer{} // fd- > buffer
  23. wp = workerpool.NewWorkerPool()
  24. )
  25. type WorkFunc func()
  26. func main() {
  27. flag.StringVar(&host, "host", host, "listen host")
  28. flag.IntVar(&port, "port", port, "listen port")
  29. flag.IntVar(&maxEpollEvents, "max.events", maxEpollEvents, "max epoll events")
  30. flag.Parse()
  31. log4go.SetFlags(log.LstdFlags | log.Lshortfile)
  32. epfd, err := syscall.EpollCreate1(0) // @remind : need to create a epoll
  33. if err != nil {
  34. panic(errors.Wrap(err, "epoll_create"))
  35. }
  36. defer syscall.Close(epfd)
  37. fd, err := newTcpConn(host, port)
  38. if err != nil {
  39. panic(err)
  40. }
  41. defer syscall.Close(fd)
  42. if err := addFDToEpoll(epfd, fd); err != nil {
  43. panic(err)
  44. }
  45. log4go.Info("Ready to listen at %s:%d", host, port)
  46. var events [1024]syscall.EpollEvent
  47. for {
  48. nevents, err := syscall.EpollWait(epfd, events[:], -1)
  49. if err != nil {
  50. panic(errors.Wrap(err, "epoll_wait"))
  51. }
  52. for ev := 0; ev < nevents; ev++ {
  53. log4go.Info("Get event [%d] from fd: %d", events[ev].Events, events[ev].Fd)
  54. switch {
  55. case int(events[ev].Fd) == fd:
  56. connFd, _, err := syscall.Accept(fd) // get the new connection
  57. if err != nil {
  58. log4go.Error("Failed to accept: %w", err)
  59. continue
  60. }
  61. syscall.SetNonblock(connFd, true)
  62. addFDToEpoll(epfd, connFd)
  63. case events[ev].Events&EPOLLIN_SET != 0:
  64. wp.Submit(workerpool.WorkerFunc(newReadWorkFunc(epfd, int(events[ev].Fd))))
  65. }
  66. }
  67. log4go.Info("Continue to wait event again.")
  68. }
  69. }
  70. func newTcpConn(host string, port int) (int, error) {
  71. fd, err := syscall.Socket(syscall.AF_INET, syscall.O_NONBLOCK|syscall.SOCK_STREAM, 0)
  72. if err != nil {
  73. return 0, errors.Wrap(err, "create socket")
  74. }
  75. if err = syscall.SetNonblock(fd, true); err != nil { // @remind have to set nonblock before bind????, I don't know why?
  76. return 0, errors.Wrap(err, "set non block")
  77. }
  78. addr := syscall.SockaddrInet4{
  79. Port: port,
  80. }
  81. copy(addr.Addr[:], net.ParseIP(host).To4()) //@remind :Addr is an array, it's not easy to assign an array
  82. syscall.Bind(fd, &addr)
  83. // @remind :what's the 10 means
  84. // 10 is the backlog, the max number of pending connections the queue will hold in the kernel
  85. syscall.Listen(fd, 10)
  86. return fd, nil
  87. }
  88. func addFDToEpoll(epfd, fd int) error {
  89. // syscall.SetNonblock(fd, true)
  90. // watch event:
  91. // - data in
  92. // - connection break
  93. var event syscall.EpollEvent
  94. event.Events = EPOLLIN_SET | EPOLLET
  95. event.Fd = int32(fd)
  96. // @remind add new connection into epoll, need to set fd status to nonblock
  97. if err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
  98. return errors.Wrap(err, "add fd with epoll_ctl_"+strconv.Itoa(fd))
  99. }
  100. return nil
  101. }
  102. func delFDFromEpoll(epfd, fd int) error {
  103. if err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, fd, nil); err != nil {
  104. return errors.Wrap(err, "remove fd with epoll_ctl_"+strconv.Itoa(fd))
  105. }
  106. return nil
  107. }
  108. func newReadWorkFunc(epfd, fd int) WorkFunc {
  109. return func() {
  110. var totalBytes = 0
  111. var buf [32 * 1024]byte
  112. for {
  113. nbytes, err := syscall.Read(fd, buf[:])
  114. totalBytes += nbytes
  115. if err != nil {
  116. if err == syscall.EAGAIN {
  117. return
  118. }
  119. log4go.Warn("Failed to read fd[%d]: %v", fd, err)
  120. return
  121. }
  122. if nbytes <= 0 {
  123. if totalBytes <= 0 {
  124. delFDFromEpoll(epfd, fd)
  125. }
  126. return
  127. }
  128. if nbytes > 0 {
  129. // @remind change to epoll write?
  130. // wBuf, ok := writeBufferMap[fd]
  131. // if !ok {
  132. // wBuf = *bytes.NewBuffer(make([]byte, 0, 1024))
  133. // writeBufferMap[fd] = wBuf
  134. // }
  135. // if n, err := wBuf.Write(buf[:nbytes]); err != nil || n != nbytes {
  136. // log4go.Warn("Failed to save write buffer[%d/%d]: %w", n, nbytes, err)
  137. // break
  138. // }
  139. fmt.Printf(">>> %s", buf)
  140. if n, err := syscall.Write(fd, buf[:nbytes]); err != nil || n != nbytes {
  141. log4go.Warn("Failed to write socket[%d/%d]: %w", n, nbytes, err)
  142. }
  143. fmt.Printf("<<< %s", buf)
  144. }
  145. }
  146. }
  147. }