概述

在写 Go 网络程序的时候,有个 TCP 连接的方法系列是我很常用的:SetDeadlineSetReadDeadlineSetWriteDeadline,他们的作用就是等待 TCP 连接的读写操作,如果在预设的时间点还没有读或者写操作的话,读的方法就会直接返回,并且返回一个 os.ErrDeadlineExceeded 类型的错误,这通常用于在判定连接是否已经失联的情况。

为什么要用 Deadline

对 TCP 有一些了解的同学可能都会问,TCP 不是有 Keepalive 么,为什么还需要单独用 Connection 的 Deadline。很明显,我们知道 TCP 的 keepalive 是系统级的,它的配置路径在 /proc/sys/net/ipv4/ 目录下:

  1. [root@liqiang.io]# ls -al /proc/sys/net/ipv4/tcp_keepalive_*
  2. -rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_intvl
  3. -rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_probes
  4. -rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_time

如果选择用 TCP 的 keepalive 配置,那么你只能通过系统的全局配置来生效,但是这在企业级网络编程中都是不太实际的,每个应用可能都有自己的独特需求,所以不能一概而论使用同一份配置。

如何使用 Deadline

一个简单的使用 Deadline 的 demo 为:

  1. [root@liqiang.io]# cat test.go
  2. func (c *ApplicationConnection) readLoop() {
  3. var err error
  4. c.conn, err = net.Dial("tcp", c.GetDialerAddr())
  5. if err != nil {
  6. c.logger.Error(c.ctx, "connect to %s: %v", d.GetDialerAddr(), err)
  7. return
  8. }
  9. c.conn.SetReadDeadline(time.Now().Add(time.Second * 10))
  10. for {
  11. bytes, err := io.ReadAll(c.conn)
  12. if err != nil {
  13. if errors.Is(err, os.ErrDeadlineExceeded) {
  14. c.logger.Trace(c.ctx, "Read timeout, send a heartbeat message")
  15. c.heartbeat()
  16. continue
  17. }
  18. c.logger.Error(c.ctx, "Failed to copy data from listener to Dialer: %v", err)
  19. return
  20. }
  21. c.conn.SetReadDeadline(time.Now().Add(time.Second * 10))
  22. c.process(bytes)
  23. }
  24. }

这里我们可以设置一个等待时间,如果在预期时间内连接没有数据可以读取,那么就会返回一个错误,我们即可根据需要对连接进行处理;如果在这期间有数据可以读取,那么我们需要注意要重置一下 Deadline 的值,不然,这个值还是会有效的。

如何实现 Deadline

在知道如何使用之后,接下来的问题就是 Go 又是如何实现 Connection 的 Deadline 的呢?在之前 Go 的源码分析中,我们知道了在网络上 Go 底层还是使用的 epoll,那么丢与 Deadline ,Go runtime 又是如何做到的?

我可以想象的一个解决方法是将 Connection 包装成一个 struct 结构,struct 结构里面包含原始的连接信息,deadline 信息(一个 timer),以及 timer 处理函数,然后在 epoll 中添加对应的 timer,这样,当 timer 被 trigger 的时候,就表示这期间都没有可读的事件,所以可以直接调用处理函数,从而达到 deadline 的效果。

但是,Go 如何实现的,还是要具体地看代码(这里我依照之前的 Go 源码分析逻辑,看的是 1.12 的代码):

  1. [root@liqiang.io]# internal/poll/fd_poll_runtime.go
  2. func (fd *FD) SetReadDeadline(t time.Time) error {
  3. return setDeadlineImpl(fd, t, 'r')
  4. ---> internal/poll/fd_poll_runtime.go
  5. func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
  6. ... ...
  7. runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
  8. ---> runtime/netpoll.go
  9. func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
  10. ... ...
  11. netpollgoready(rg, 3)
  12. ---> runtime/netpoll.go
  13. ... ...
  14. pd.rt.f = rtf
  15. pd.rt.when = pd.rd
  16. pd.rt.arg = pd
  17. pd.rt.seq = pd.rseq
  18. addtimer(&pd.rt)

可以看到,Go 的实现大体上和我的设想差不多,只不过多了更多的条件保障之类的。

C++ 如何实现

最近在尝试 C++ 编写一些网络程序,所以自然而然我也想看下 C++ 是否支持类似的功能,但是好像没有直接支持的方式,于是我就基于 libevent 自己实现了一下,代码主要分为几部分:

  1. [root@liqiang.io]# cat echo_server.cpp
  2. struct TimeoutConnection {
  3. long int last_read_ts;
  4. std::string name;
  5. evutil_socket_t fd;
  6. struct event_base *base;
  7. struct bufferevent *bev;
  8. struct event *timeout_event;
  9. };
  1. static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) {
  2. std::cout << get_current_time() << " listener_cb" << std::endl;
  3. struct event_base *base = (event_base *) user_data;
  4. struct bufferevent *bev;
  5. bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
  6. if (!bev) {
  7. std::cerr << get_current_time() << " failed to constructing bufferevent!" << std::endl;
  8. event_base_loopbreak(base);
  9. return;
  10. }
  11. struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) malloc(sizeof(struct TimeoutConnection));
  12. timeoutConn->last_read_ts = std::time(nullptr);
  13. timeoutConn->name = "test";
  14. timeoutConn->fd = fd;
  15. timeoutConn->base = base;
  16. timeoutConn->bev = bev;
  17. timeoutConn->timeout_event = event_new(base, -1, EV_PERSIST, timeout_cb, timeoutConn);
  18. bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, timeoutConn);
  19. bufferevent_enable(bev, EV_READ);
  20. struct timeval tv = {.tv_sec = 5, .tv_usec = 0};
  21. auto result = event_add(timeoutConn->timeout_event, &tv);
  22. if (result != 0) {
  23. std::cout << "event_add failed" << std::endl;
  24. }
  25. }
  1. [root@liqiang.io]# cat echo_server.cpp
  2. static void conn_readcb(struct bufferevent *bev, void *ptr) {
  3. struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) ptr;
  4. timeoutConn->last_read_ts = std::time(nullptr);
  5. char buf[1024];
  6. int n;
  7. struct evbuffer *input = bufferevent_get_input(bev);
  8. while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) {
  9. std::cout << get_current_time() << " connection " << timeoutConn->name << " recv: " << buf << std::endl;
  10. bufferevent_write(bev, buf, n);
  11. }
  12. }
  1. [root@liqiang.io]# cat echo_server.cpp
  2. static void timeout_cb(evutil_socket_t fd, short what, void *arg) {
  3. std::cout << get_current_time() << " connection timeout invoked" << std::endl;
  4. auto currTs = std::time(nullptr);
  5. struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) arg;
  6. if (currTs - timeoutConn->last_read_ts > 10) {
  7. std::cout << get_current_time() << " connection " << timeoutConn->name << " timeout" << std::endl;
  8. std::cout << get_current_time() << " connection " << timeoutConn->name << " last read at: " << timeoutConn->last_read_ts << std::endl;
  9. bufferevent_free(timeoutConn->bev);
  10. event_free(timeoutConn->timeout_event);
  11. free(timeoutConn);
  12. return;
  13. }
  14. std::cout << get_current_time() << " connection " << timeoutConn->name << " last read at: " << timeoutConn->last_read_ts << std::endl;
  15. }

概述

到这里差不多就了解了这部分的实现了,倒是也不怎么复杂。但是,可以深究的事情也很多,例如 Go 底层的 timer 是如何实现的这些,但是这些我以前也写过类似的(Linux 实现定时器),所以就不过多深究了。