开发内功修炼@张彦飞开发内功修炼@张彦飞

talk is cheap,
show me the code!

在 golang 中是如何对 epoll 进行封装的?

在 golang 中是如何对 epoll 进行封装的?


在协程没有流行以前,传统的网络编程中,同步阻塞是性能低下的代名词,一次切换就得是 3 us左右的 CPU 开销。各种基于 epoll 的异步非阻塞的模型虽然提高了性能,但是基于回调函数的编程方式却非常不符合人的的直线思维模式。开发出来的代码的也不那么容易被人理解。

Golang 的出现,可以说是将协程编程模式推向了一个高潮。这种新的编程方式既兼顾了同步编程方式的简单易用,也在底层通过协程和 epoll 的配合避免了线程切换的性能高损耗。换句话说就是既简单易用,性能又还不挺错。

飞哥当年也是相中的 golang 的这个特点,开始带领团队转型 golang 开发的。那么今天我们来深刻地和大家分享一下 golang 官方提供的 net 包,来看看它是如何达成上面所说的这样的效果的。

一、Golang net的使用方式

考虑到不少读者没有使用过 golang,那么开头我先把一个基于官方 net 包的 golang 服务的简单使用代码给大家列出来。为了方便大家理解,我只保留骨干代码。

func main() {
    //构造一个listener
    listener, _ := net.Listen("tcp", "127.0.0.1:9008")

    for {
        //接收请求
        conn, err := listener.Accept()

        //启动一个协程来处理
        go process(conn)
    }
}

func process(conn net.Conn) {
    //结束时关闭连接
    defer conn.Close()

    //读取连接上的数据
    var buf [1024]byte
    len, err := conn.Read(buf[:])

    //发送数据
    _, err = conn.Write([]byte("I am server!"))

    ...
}

在这个示例服务程序中,先是使用 net.Listen 来监听了本地的 9008 这个端口。然后调用 Accept 进行接收连接处理。如果接收到了连接请求,通过 go process() 来启动一个协程进行处理。在连接的处理中我展示了读写操作(Read 和 Write)。

整个服务程序看起来,妥妥的就是一个同步模型,包括 Accept、Read 和 Write 都会将当前协程给“阻塞”掉。比如 Read 函数这里,如果服务器调用时客户端数据还没有到达,那么 Read 是不带返回的,会将当前的协程 park 住。直到有了数据 Read 才会返回,处理协程继续执行。

你如果在其它语言,例如 C 和 Java 中写出这样类似的服务器代码,估计会被打死的。因为每一次同步的 Accept、Read、Write 都会导致你当前的线程被阻塞掉,会浪费大量的 CPU 进行线程上下文的切换。

但是在 golang 中这样的代码运行性能却是非常的不错,为啥呢?我们继续看本文接下来的内容。

二、Listen 底层过程

在传统的 C、Java 等传统语言中,listen 所做的事情就是直接调用内核的 listen 系统调用。参见《为什么服务端程序都需要先 listen 一下?》。但是如果你也这么同等地理解 golang net 包里的 Listen, 那可就大错特错了。

和其它语言不同,在 golang net 的 listen 中,会完成如下几件事:

  • 创建 socket 并设置非阻塞,
  • bind 绑定并监听本地的一个端口
  • 调用 listen 开始监听
  • epoll_create 创建一个 epoll 对象
  • epoll_etl 将 listen 的 socket 添加到 epoll 中等待连接到来

一次 Golang 的 Listen 调用,相当于在 C 语言中的 socket、bind、listen、epoll_create、epoll_etl 等多次函数调用的效果。封装度非常的高,更大程度地对程序员屏蔽了底层的实现细节。

插一句题外话:现在的各种开发工具的封装程度越来越高,真不知道对码农来说是好事还是坏事。好处是开发效率更高了,缺点是将来的程序员想了解底层也越来越难了,越来越像传统企业里流水线上的工人。

口说无凭,我们挖开 Golang 的内部源码瞅一瞅,这样更真实。

listen.png

Listen 的入口在 golang 源码的 net/dial.go 文件中,让我们展开来看更细节的逻辑。

2.1 Listen 入口执行流程

源码不用细看,看懂大概流程就可以。

//file:go1.14.4/src/net/dial.go
func Listen(network, address string) (Listener, error) {
    var lc ListenConfig
    return lc.Listen(context.Background(), network, address)
}

可见,这个 Listen 只是一个入口。接下来会进入到 ListenConfig 下的 Listen 方法中。在 ListenConfig 的 Listen 中判断这是一个 TCP 类型的话,会进入到 sysListener 下的 listenTCP 方法里(src/net/tcpsock_posix.go)。然后再经过两三次的函数调用跳转,会进入到 net/sock_posix.go 文件下的 socket 函数中。我们直接看它。

//file:go1.14.4/src/net/sock_posix.go
func socket(ctx context.Context, net string, family, ...) (fd *netFD, err error) {
    //创建 socket,见 2.2 小节 
    s, err := sysSocket(family, sotype, proto)

    ...

    //TCP 绑定和监听,见 2.3 小节
    //epoll对象的创建以及文件描述符的添加 见 2.4 小节
    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            fd.listenStream(laddr, listenerBacklog(), ctrlFn);
        ......
    }
}

接下来我们分别在 2.2 和 2.3 小节来介绍 sysSocket 和 listenStream 这两个函数。

2.2 创建 socket

sysSocket 这个函数和其它语言中的 socket 函数有很大的不同。在这个一个函数内就完成了三件事,创建 socket、bind 和 listen 监听。我们来看 sysSocket的具体代码。

//file:net/sys_cloexec.go
func sysSocket(family, sotype, proto int) (int, error) {
    //创建 socket
    s, err := socketFunc(family, sotype, proto)

    //设置为非阻塞模式
    syscall.SetNonblock(s, true)
}

在 sysSocket 中,调用的 socketFunc 其实就是 socket 系统调用。见如下代码。

//file:net/hook_unix.go
var (
    // Placeholders for socket system calls.
    socketFunc        func(int, int, int) (int, error)  = syscall.Socket
    connectFunc       func(int, syscall.Sockaddr) error = syscall.Connect
    listenFunc        func(int, int) error              = syscall.Listen
    getsockoptIntFunc func(int, int, int) (int, error)  = syscall.GetsockoptInt
)

创建完 socket 之后,再调用 syscall.SetNonblock 将其设置为非阻塞模式。

//file:syscall/exec_unix.go
func SetNonblock(fd int, nonblocking bool) (err error) {
    ...
    if nonblocking {
        flag |= O_NONBLOCK
    } 
    fcntl(fd, F_SETFL, flag)
}

2.3 绑定和监听

我们接着再来看 listenStream。这个函数一进来就调用了系统调用 bind 和 listen 来完成了绑定和监听。

//file:net/sock_posix.go
func (fd *netFD) listenStream(laddr sockaddr,...) error 
{
    ...

    //等同于 c 语言中的: bind(listenfd, ...)
    syscall.Bind(fd.pfd.Sysfd, lsa);

    //等同于 c 语言中的:listen(listenfd, ...)
    listenFunc(fd.pfd.Sysfd, backlog);

    //这里非常关键:初始化socket与异步IO相关的内容
    if err = fd.init(); err != nil {
        return err
    }
}

其中 listenFunc 是一个宏,指向的就是 syscall.Listen 系统调用

//file:go1.14.4/src/net/hook_unix.go
import "syscall"
var (
    // Placeholders for socket system calls.
    socketFunc        func(int, int, int) (int, error)  = syscall.Socket
    connectFunc       func(int, syscall.Sockaddr) error = syscall.Connect
    listenFunc        func(int, int) error              = syscall.Listen
    getsockoptIntFunc func(int, int, int) (int, error)  = syscall.GetsockoptInt
)

2.4 epoll创建和初始化

接下来在 fd.init 这一行,经过多次的函数调用展开以后会执行到 epoll 对象的创建,并还把在 listen 状态的 socket 句柄添加到了 epoll 对象中来管理其网络事件。

我们来看它是如何完成的。

//file:go1.14.4/src/internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {

    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    ...
    return nil
}

serverInit.Do 这个是用来保证参数内的函数只执行一次的。不过多展开介绍。其参数 runtime_pollServerInit 是对 runtime 包的函数 poll_runtime_pollServerInit 的调用,其源码位于 runtime/netpoll.go 下。

//file:runtime/netpoll.go
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

该函数会执行到 netpollGenericInit, epoll 就是在它的内部创建的。

//file:netpoll_epoll.go
func netpollinit() {
    // epoll 对象的创建
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    ...
}

再来看 runtime_pollOpen。它的参数就是前面 listen 好了的 socket 的文件描述符。在这个函数里,它将被放到 epoll 对象中。

//file:runtime/netpoll_epoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    ...
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

//file:runtime/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd

    // listen 状态的 socket 被添加到了 epoll 中。
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

三、Accept 过程

服务端在 Listen 完了之后,就是对 Accept 的调用了。该函数主要做了三件事

  • 调用 accept 系统调用接收一个连接
  • 如果没有连接到达,把当前协程阻塞掉
  • 新连接到来的话,将其添加到 epoll 中管理,然后返回

![](accept.png)

通过 Golang 里的单步调试可以看到它进入到了 TCPListener 下的 Accept 里了。

//file: net/tcpsock.go
func (l *TCPListener) Accept() (Conn, error) {
    c, err := l.accept()
    ...
}
func (ln *TCPListener) accept() (*TCPConn, error) {
    //以 netFD 的形式返回一个新连接
    fd, err := ln.fd.accept()
}

我们上面说的三步都是在 netFD 的 accept 函数里处理的。

//file:net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
    //3.1 接收一个连接
    //3.2 如果连接没有到达阻塞当前协程
    d, rsa, errcall, err := fd.pfd.Accept()

    //3.2 将新到的连接也添加到 epoll 中进行管理
    netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
    netfd.init();

    ...
    return netfd, nil
}

接下来我们详细看每一步的细节。

3.1 接收一个连接

经过单步跟踪后发现 Accept 进入到了 FD 对象的 Accept 方法下。在这里将调用操作系统的 accept 系统调用。

//file:internal/poll/fd_unix.go
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {

    for {
        //调用 accept 系统调用接收一个连接
        s, rsa, errcall, err := accept(fd.Sysfd)

        //接收到了连接就返回它
        if err == nil {
            return s, rsa, "", err
        }

        switch err {
        case syscall.EAGAIN:
            //如果没有获取到,那就把协程给阻塞起来
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        ...    
    }
    ...
}

其中 accept 方法内部会触发 linux 操作系统的 accept 系统调用,我们就不过度展开了。
调用 accept 目的是获取一个来自客户端的连接。如果接收到了,就把他返回回去。

3.2 阻塞当前协程

我们来说说如果没 accept 调用的时候,客户端的连接请求还一个都没有过来怎么办。

这时候,accept 系统调用会返回 syscall.EAGAIN。Golang 在对这个状态的处理中,会把当前协程给阻塞起来。关键代码在这里

//file: internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

runtime_pollWait 的源码在 runtime/netpoll.go 下。gopark(协程的阻塞)就是在这里完成的。

//file:runtime/netpoll.go
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    ...
    for !netpollblock(pd, int32(mode), false) {
    }
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    ...
    if waitio || netpollcheckerr(pd, mode) == 0 {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
}

gopark 这个函数就是 golang 内部阻塞协程的入口。

3.3 将新连接添加到 epoll 中。

我们再来说说假如客户端连接已经到来了的情况。这时 fd.pfd.Accept 会返回新建的连接。然后会将该新连接也一并加入到 epoll 中进行高效的事件管理。

//file:net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
    //3.1 接收一个连接
    //3.2 如果连接没有到达阻塞当前协程
    d, rsa, errcall, err := fd.pfd.Accept()

    //3.2 将新到的连接也添加到 epoll 中进行管理
    netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
    netfd.init();

    ...
    return netfd, nil
}

我们来看 netfd.init

//file:internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
    ...
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    ...
}

runtime_pollOpen 这个runtime 函数我们在上面的 2.4 节介绍过了,就是把文件句柄添加到 epoll 对象中。

//file:runtime/netpoll_epoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    ...
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd

    //新连接的 socket 也被添加到了 epoll 中。
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

四、Read 和 Write 内部过程

当连接接收完成后,剩下的就是在连接上的读写了。

4.1 Read 内部过程

我们先来看 Read。

![](read.png)

来看详细的代码。

//file:/Users/zhangyanfei/sdk/go1.14.4/src/net/net.go
func (c *conn) Read(b []byte) (int, error) {
    ...
    n, err := c.fd.Read(b)
}

Read 函数会进入到 FD 的 Read 中。在这个函数内部调用 Read 系统调用来读取数据。如果数据还尚未到达则也是把自己阻塞起来。

//file:internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {
    for {
        //调用 Read 系统调用
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0

            //将自己添加到 epoll 中等待事件,然后阻塞掉。
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        ......    
    }        
}

其中 waitRead 是如何将当前协程阻塞掉的,这个和我们前面 3.2 节介绍的是一样的,就不过多展开叙述了。

4.2 Write 内部过程

Write 的大体过程和 Read 是类似的。 先是调用 Write 系统调用发送数据,如果内核发送缓存区不足的时候,就把自己先阻塞起来,然后等可写时间发生的时候再继续发送。其源码入口位于 net/net.go。

//file:net/net.go
func (c *conn) Write(b []byte) (int, error) {
    ...
    n, err := c.fd.Write(b)
}
//file:internal/poll/fd_unix.go
func (fd *FD) Write(p []byte) (int, error) {
    for {
        n, err := syscall.Write(fd.Sysfd, p[nn:max])
        if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitWrite(fd.isFile); err == nil {
                continue
            }
        }
    }
}
//file:internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitWrite(isFile bool) error {
    return pd.wait('w', isFile)
}

pd.wait 之后的事情就又和 3.2 节介绍的过程一样了。调用 runtime_pollWait 来讲当前协程阻塞掉。

func (pd *pollDesc) wait(mode int, isFile bool) error {
    ...
    res := runtime_pollWait(pd.runtimeCtx, mode)
}

五、Golang 唤醒

前面我们讨论的很多步骤里都涉及到协程的阻塞。例如 Accept 时如果新连接还尚未到达。再比如像 Read 数据的时候对方还没有发送,当前协程都不会占着 cpu 不放,而是会阻塞起来。

那么当要等待的事件就绪的时候,被阻塞掉的协程又是如何被重新调度的呢? 相信大家一定会好奇这个问题。

Go 语言的运行时会在调度或者系统监控中调用 sysmon,它会调用 netpoll,来不断地调用 epoll_wait 来查看 epoll 对象所管理的文件描述符中哪一个有事件就绪需要被处理了。如果有,就唤醒对应的协程来进行执行。

其实除此之外还有几个地方会唤醒协程,如

  • startTheWorldWithSema
  • findrunnable 在 schedule 中调用
    有top 和 stop 之分。 其中 stop 中会导致阻塞。
  • pollWork

不过为了简便起见,我们只选择 sysmon 来作为一个切入口。sysmon 是一个周期性的监控协程,来看源码。

//file:src/runtime/proc.go
func sysmon() {
    ...
    list := netpoll(0) 
}

它会不断触发对 netpoll 的调用,在 netpoll 会调用 epollwait 看查看是否有网络事件发生。

//file:runtime/netpoll_epoll.go
func netpoll(delay int64) gList {
    ...
retry:
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        //没有网络事件
        goto retry
    }

    for i := int32(0); i < n; i++ {

        //查看是读事件还是写事件发生
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }

        if mode != 0 {

            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.everr = false
            if ev.events == _EPOLLERR {
                pd.everr = true
            }
            netpollready(&toRun, pd, mode)
        }
    }
}

在 epoll 返回的时候,ev.data 中是就绪的网络 socket 的文件描述符。根据网络就绪 fd 拿到 pollDesc。在 netpollready 中,将对应的协程推入可运行队列等待调度执行。

//file:runtime/netpoll.go
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

本文总结

同步编码方式的优点是符合人的直线思维。在这种模式下的代码很容易写,写出来也容易理解,但是缺点就是性能奇差。因为会导致频繁的线程上下文切换。

所以现在 epoll 是 Linux 下网络程序工作的最主要的模式。现在各种语言下的流行的网络框架模型都是基于 epoll 来工作的。区别就是各自对 epoll 的使用方式上存在一些差别。主流各种基于 epoll 的异步非阻塞的模型虽然提高了性能,但是基于回调函数的编程方式却非常不符合人的的直线思维模式。开发出来的代码的也不那么容易被人理解。

Golang开辟了一种新的网络编程模型。这种模型在应用层看来仍然是同步的方式。但是在底层确实通过协程和 epoll 的配合避免了线程切换的性能高损耗,因此并不会阻塞用户线程。代替的是切换开销更小的协程。协程的切换开销大约只有线程切换的三十分之一,参见《协程究竟比线程牛在什么地方?》

我个人一直觉得,Golang 封装的网络编程模型非常之精妙,是世界级的代码。它非常值得你好好学习一下。学完了觉得好的话,转发给你的朋友们一起来了解了解吧!

往期相关文章

写在最后,由于我的这些知识在公众号里文章比较分散,很多人似乎没有理解到我对知识组织的体系结构。而且图文也不像视频那样理解起来更直接。所以我在知识星球上规划了视频系列课程,包括硬件原理、内存管理、进程管理、文件系统、网络管理、Golang语言、容器原理、性能观测、性能优化九大部分大约 120 节内容,每周更新。加入方式参见我要开始搞知识星球啦如何才能高效地学习技术,我投“融汇贯通”一票

Github:https://github.com/yanfeizhang/coder-kung-fu
关注公众号:微信扫描下方二维码
qrcode2_640.png

本原创文章未经允许不得转载 | 当前页面:开发内功修炼@张彦飞 » 在 golang 中是如何对 epoll 进行封装的?