Background

When writing Go networking applications, there is a series of TCP connection methods that I use a lot: SetDeadline, SetReadDeadline, SetWriteDeadline. Their purpose is to wait for a read or write operation on a TCP connection, and if there is no read or write operation at the preset time point, the read method will get an ErrDeadlineExceeded error, which is often used to determine if a connection has been lost. In this article, I’ll describe how to use them and how Go implements them, and try to achieve a similar effect in C++.

Why we need Deadline

As you known something about TCP, you might ask: doesn’t TCP have Keepalive protocol? Why do we still need to use the Deadline for the Connection? Obviously, we know the keepalive for TCP is system level, and it’s configured at the /proc/sys/net/ipv4/ directory:

  1. [root@liqiang.io]# ls -al /proc/sys/net/ipv4/tcp_keepalive_*
  2. -rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_intvl
  3. -rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_probes
  4. -rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_time

If we choose to use the keepalive configure for the TCP, you might need to setup the system global config, but in the enterprise network development, it’s no a good practive, as each application will have it’s self requirement, so we can’t use the same configure directly.

How to use Deadline

An simple demo to use Deadline will be like:

  1. [root@liqiang.io]# cat test.go
  2. func (c *ApplicationConnection) readLoop() {
  3. var err error
  4. c.conn, err = net.Dial("tcp", c.GetDialerAddr())
  5. if err != nil {
  6. c.logger.Error(c.ctx, "connect to %s: %v", d.GetDialerAddr(), err)
  7. return
  8. }
  9. c.conn.SetReadDeadline(time.Now().Add(time.Second * 10))
  10. for {
  11. bytes, err := io.ReadAll(c.conn)
  12. if err != nil {
  13. if errors.Is(err, os.ErrDeadlineExceeded) {
  14. c.logger.Trace(c.ctx, "Read timeout, send a heartbeat message")
  15. c.heartbeat()
  16. continue
  17. }
  18. c.logger.Error(c.ctx, "Failed to copy data from listener to Dialer: %v", err)
  19. return
  20. }
  21. c.conn.SetReadDeadline(time.Now().Add(time.Second * 10))
  22. c.process(bytes)
  23. }
  24. }

Here we can set a waiting time, if there is no data to be read from the connection within the expected time, then an error will be returned and we can process the connection as needed; if there is data to be read during this period, then we need to be careful to reset the Deadline value, otherwise, the value will still be valid.

How to implement Deadline

After knowing how to use, the next question is how Go implement Connection’s Deadline? In the previous analysis of Go’s source code, we know that Go still uses epoll at the bottom of the network, so how does Go runtime do it with Deadline?

I can imagine a solution is to package Connection into a struct structure, struct structure contains the original connection information, deadline information (a timer), and timer handler, and then add the corresponding timer in epoll, so that when the timer is triggered, it means that there is nothing to be done during the period. , it means that there are no readable events in the meantime, so you can call the handler function directly, thus achieving the effect of a deadline.

However, how Go achieves this, we still have to look at the code (here I follow the previous Go source code analysis logic, looking at the code of 1.12):

  1. [root@liqiang.io]# internal/poll/fd_poll_runtime.go
  2. func (fd *FD) SetReadDeadline(t time.Time) error {
  3. return setDeadlineImpl(fd, t, 'r')
  4. ---> internal/poll/fd_poll_runtime.go
  5. func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
  6. ... ...
  7. runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
  8. ---> runtime/netpoll.go
  9. func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
  10. ... ...
  11. netpollgoready(rg, 3)
  12. ---> runtime/netpoll.go
  13. ... ...
  14. pd.rt.f = rtf
  15. pd.rt.when = pd.rd
  16. pd.rt.arg = pd
  17. pd.rt.seq = pd.rseq
  18. addtimer(&pd.rt)

As you can see, the Go implementation is largely what I envisioned, except with more conditional guarantees and such.

How to implement in C++

Recently, I’ve been trying to write some network programs in C++, so naturally I wanted to see if C++ supports similar functionality, but there doesn’t seem to be any direct support for it, so I implemented it myself based on libevent, and the code is divided into several parts:

  1. [root@liqiang.io]# cat echo_server.cpp
  2. struct TimeoutConnection {
  3. long int last_read_ts;
  4. std::string name;
  5. evutil_socket_t fd;
  6. struct event_base *base;
  7. struct bufferevent *bev;
  8. struct event *timeout_event;
  9. };
  1. static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) {
  2. std::cout << get_current_time() << " listener_cb" << std::endl;
  3. struct event_base *base = (event_base *) user_data;
  4. struct bufferevent *bev;
  5. bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
  6. if (!bev) {
  7. std::cerr << get_current_time() << " failed to constructing bufferevent!" << std::endl;
  8. event_base_loopbreak(base);
  9. return;
  10. }
  11. struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) malloc(sizeof(struct TimeoutConnection));
  12. timeoutConn->last_read_ts = std::time(nullptr);
  13. timeoutConn->name = "test";
  14. timeoutConn->fd = fd;
  15. timeoutConn->base = base;
  16. timeoutConn->bev = bev;
  17. timeoutConn->timeout_event = event_new(base, -1, EV_PERSIST, timeout_cb, timeoutConn);
  18. bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, timeoutConn);
  19. bufferevent_enable(bev, EV_READ);
  20. struct timeval tv = {.tv_sec = 5, .tv_usec = 0};
  21. auto result = event_add(timeoutConn->timeout_event, &tv);
  22. if (result != 0) {
  23. std::cout << "event_add failed" << std::endl;
  24. }
  25. }
  1. [root@liqiang.io]# cat echo_server.cpp
  2. static void conn_readcb(struct bufferevent *bev, void *ptr) {
  3. struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) ptr;
  4. timeoutConn->last_read_ts = std::time(nullptr);
  5. char buf[1024];
  6. int n;
  7. struct evbuffer *input = bufferevent_get_input(bev);
  8. while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) {
  9. std::cout << get_current_time() << " connection " << timeoutConn->name << " recv: " << buf << std::endl;
  10. bufferevent_write(bev, buf, n);
  11. }
  12. }
  1. [root@liqiang.io]# cat echo_server.cpp
  2. static void timeout_cb(evutil_socket_t fd, short what, void *arg) {
  3. std::cout << get_current_time() << " connection timeout invoked" << std::endl;
  4. auto currTs = std::time(nullptr);
  5. struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) arg;
  6. if (currTs - timeoutConn->last_read_ts > 10) {
  7. std::cout << get_current_time() << " connection " << timeoutConn->name << " timeout" << std::endl;
  8. std::cout << get_current_time() << " connection " << timeoutConn->name << " last read at: " << timeoutConn->last_read_ts << std::endl;
  9. bufferevent_free(timeoutConn->bev);
  10. event_free(timeoutConn->timeout_event);
  11. free(timeoutConn);
  12. return;
  13. }
  14. std::cout << get_current_time() << " connection " << timeoutConn->name << " last read at: " << timeoutConn->last_read_ts << std::endl;
  15. }

Summary

That’s pretty much all you need to know about this part of the implementation, and it’s not too complicated. However, there’s a lot to dig into, such as how Go’s underlying timer is implemented, but I’ve written similar things before(Linux 实现定时器),here I will not to write more.