Fork me on GitHub
随笔 - 213  文章 - 13  trackbacks - 0
<2017年6月>
28293031123
45678910
11121314151617
18192021222324
2526272829301
2345678


专注即时通讯及网游服务端编程
------------------------------------
Openresty之lua-nginx-module
------------------------------------
本博收藏大部分文章为转载,并在文章开头给出了原文出处,如有再转,敬请保留相关信息,这是大家对原创作者劳动成果的自觉尊重!!如为您带来不便,请于本博下留言,谢谢配合。

常用链接

留言簿(1)

随笔分类

随笔档案

相册

Awesome

Blog

Book

GitHub

Link

搜索

  •  

积分与排名

  • 积分 - 88948
  • 排名 - 230

最新评论

阅读排行榜

http://skoo.me/go/2014/04/21/go-net-core

Go语言的出现,让我见到了一门语言把网络编程这件事情给做“正确”了,当然,除了Go语言以外,还有很多语言也把这件事情做”正确”了。我一直坚持着这样的理念——要做"正确"的事情,而不是"高性能"的事情;很多时候,我们在做系统设计、技术选型的时候,都被“高性能”这三个字给绑架了,当然不是说性能不重要,你懂的。


目前很多高性能的基础网络服务器都是采用的C语言开发的,比如:Nginx、Redis、memcached等,它们都是基于”事件驱动 + 事件回掉函数”的方式实现,也就是采用epoll等作为网络收发数据包的核心驱动。不少人(包括我自己)都认为“事件驱动 + 事件回掉函数”的编程方法是“反人类”的;因为大多数人都更习惯线性的处理一件事情,做完第一件事情再做第二件事情,并不习惯在N件事情之间频繁的切换干活。为了解决程序员在开发服务器时需要自己的大脑不断的“上下文切换”的问题,Go语言引入了一种用户态线程goroutine来取代编写异步的事件回掉函数,从而重新回归到多线程并发模型的线性、同步的编程方式上。
用Go语言写一个最简单的echo服务器:
package main
import (
"log"
"net"
)
func main() {
ln, err := net.Listen("tcp", ":8080")
if err != nil {
        log.Println(err)
        return
}
for {
        conn, err := ln.Accept()
        if err != nil {
            log.Println(err)
            continue
        }
        go echoFunc(conn)
}
}
func echoFunc(c net.Conn) {
buf := make([]byte, 1024)
for {
        n, err := c.Read(buf)
        if err != nil {
            log.Println(err)
            return
        }
        c.Write(buf[:n])
}
}
main函数的过程就是首先创建一个监听套接字,然后用一个for循环不断的从监听套接字上Accept新的连接,最后调用echoFunc函数在建立的连接上干活。关键代码是:
go echoFunc(conn)
每收到一个新的连接,就创建一个“线程”去服务这个连接,因此所有的业务逻辑都可以同步、顺序的编写到echoFunc函数中,再也不用去关心网络IO是否会阻塞的问题。不管业务多复杂,Go语言的并发服务器的编程模型都是长这个样子。可以肯定的是,在linux上Go语言写的网络服务器也是采用的epoll作为最底层的数据收发驱动,Go语言网络的底层实现中同样存在“上下文切换”的工作,只是这个切换工作由runtime的调度器来做了,减少了程序员的负担。
弄明白网络库的底层实现,貌似只要弄清楚echo服务器中的Listen、Accept、Read、Write四个函数的底层实现关系就可以了。本文将采用自底向上的方式来介绍,也就是从最底层到上层的方式,这也是我阅读源码的方式。底层实现涉及到的核心源码文件主要有:
net/fd_unix.go 
net/fd_poll_runtime.go
runtime/netpoll.goc 
runtime/netpoll_epoll.c 
runtime/proc.c (调度器)
netpoll_epoll.c文件是Linux平台使用epoll作为网络IO多路复用的实现代码,这份代码可以了解到epoll相关的操作(比如:添加fd到epoll、从epoll删除fd等),只有4个函数,分别是runtime·netpollinit、runtime·netpollopen、runtime·netpollclose和runtime·netpoll。init函数就是创建epoll对象,open函数就是添加一个fd到epoll中,close函数就是从epoll删除一个fd,netpoll函数就是从epoll wait得到所有发生事件的fd,并将每个fd对应的goroutine(用户态线程)通过链表返回。用epoll写过程序的人应该都能理解这份代码,没什么特别之处。
void
runtime·netpollinit(void)
{
epfd = runtime·epollcreate1(EPOLL_CLOEXEC);
if(epfd >= 0)
return;
epfd = runtime·epollcreate(1024);
if(epfd >= 0) {
runtime·closeonexec(epfd);
return;
}
runtime·printf("netpollinit: failed to create descriptor (%d)\n", -epfd);
runtime·throw("netpollinit: failed to create descriptor");
}
runtime·netpollinit函数首先使用runtime·epollcreate1创建epoll实例,如果没有创建成功,就换用runtime·epollcreate再创建一次。这两个create函数分别等价于glibc的epoll_create1和epoll_create函数。只是因为Go语言并没有直接使用glibc,而是自己封装的系统调用,但功能是等价于glibc的。可以通过man手册查看这两个create的详细信息。
int32
runtime·netpollopen(uintptr fd, PollDesc *pd)
{
EpollEvent ev;
int32 res;
ev.events = EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET;
ev.data = (uint64)pd;
res = runtime·epollctl(epfd, EPOLL_CTL_ADD, (int32)fd, &ev);
return -res;
}
添加fd到epoll中的runtime·netpollopen函数可以看到每个fd一开始都关注了读写事件,并且采用的是边缘触发,除此之外还关注了一个不常见的新事件EPOLLRDHUP,这个事件是在较新的内核版本添加的,目的是解决对端socket关闭,epoll本身并不能直接感知到这个关闭动作的问题。注意任何一个fd在添加到epoll中的时候就关注了EPOLLOUT事件的话,就立马产生一次写事件,这次事件可能是多余浪费的。
epoll操作的相关函数都会在事件驱动的抽象层中去调用,为什么需要这个抽象层呢?原因很简单,因为Go语言需要跑在不同的平台上,有Linux、Unix、Mac OS X和Windows等,所以需要靠事件驱动的抽象层来为网络库提供一致的接口,从而屏蔽事件驱动的具体平台依赖实现。runtime/netpoll.goc源文件就是整个事件驱动抽象层的实现,抽象层的核心数据结构是:
struct PollDesc
{
PollDesc* link; // in pollcache, protected by pollcache.Lock
Lock; // protectes the following fields
uintptr fd;
bool closing;
uintptr seq; // protects from stale timers and ready notifications
G* rg; // G waiting for read or READY (binary semaphore)
Timer rt; // read deadline timer (set if rt.fv != nil)
int64 rd; // read deadline
G* wg; // the same for writes
Timer wt;
int64 wd;
};
每个添加到epoll中的fd都对应了一个PollDesc结构实例,PollDesc维护了读写此fd的goroutine这一非常重要的信息。可以大胆的推测一下,网络IO读写操作的实现应该是:当在一个fd上读写遇到EAGAIN错误的时候,就将当前goroutine存储到这个fd对应的PollDesc中,同时将goroutine给park住,直到这个fd上再此发生了读写事件后,再将此goroutine给ready激活重新运行。事实上的实现大概也是这个样子的。
事件驱动抽象层主要干的事情就是将具体的事件驱动实现(比如: epoll)通过统一的接口封装成Go接口供net库使用,主要的接口也是:创建事件驱动实例、添加fd、删除fd、等待事件以及设置DeadLine。runtime_pollServerInit负责创建事件驱动实例,runtime_pollOpen将分配一个PollDesc实例和fd绑定起来,然后将fd添加到epoll中,runtime_pollClose就是将fd从epoll中删除,同时将删除的fd绑定的PollDesc实例删除,runtime_pollWait接口是至关重要的,这个接口一般是在非阻塞读写发生EAGAIN错误的时候调用,作用就是park当前读写的goroutine。
runtime中的epoll事件驱动抽象层其实在进入net库后,又被封装了一次,这一次封装从代码上看主要是为了方便在纯Go语言环境进行操作,net库中的这次封装实现在net/fd_poll_runtime.go文件中,主要是通过pollDesc对象来实现的:
type pollDesc struct {
runtimeCtx uintptr
}
注意:此处的pollDesc对象不是上文提到的runtime中的PollDesc,相反此处pollDesc对象的runtimeCtx成员才是指向的runtime的PollDesc实例。pollDesc对象主要就是将runtime的事件驱动抽象层给再封装了一次,供网络fd对象使用。
var serverInit sync.Once
func (pd *pollDesc) Init(fd *netFD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
if errno != 0 {
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
pollDesc对象最需要关注的就是其Init方法,这个方法通过一个sync.Once变量来调用了runtime_pollServerInit函数,也就是创建epoll实例的函数。意思就是runtime_pollServerInit函数在整个进程生命周期内只会被调用一次,也就是只会创建一次epoll实例。epoll实例被创建后,会调用runtime_pollOpen函数将fd添加到epoll中。
网络编程中的所有socket fd都是通过netFD对象实现的,netFD是对网络IO操作的抽象,linux的实现在文件net/fd_unix.go中。netFD对象实现有自己的init方法,还有完成基本IO操作的Read和Write方法,当然除了这三个方法以外,还有很多非常有用的方法供用户使用。
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd + serialize access to Read and Write methods
fdmu fdMutex
// immutable until Close
sysfd       int
family      int
sotype      int
isConnected bool
net         string
laddr       Addr
raddr       Addr
// wait server
pd pollDesc
}
通过netFD对象的定义可以看到每个fd都关联了一个pollDesc实例,通过上文我们知道pollDesc对象最终是对epoll的封装。
func (fd *netFD) init() error {
if err := fd.pd.Init(fd); err != nil {
return err
}
return nil
}
netFD对象的init函数仅仅是调用了pollDesc实例的Init函数,作用就是将fd添加到epoll中,如果这个fd是第一个网络socket fd的话,这一次init还会担任创建epoll实例的任务。要知道在Go进程里,只会有一个epoll实例来管理所有的网络socket fd,这个epoll实例也就是在第一个网络socket fd被创建的时候所创建。
for {
n, err = syscall.Read(int(fd.sysfd), p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.WaitRead(); err == nil {
continue
}
}
}
err = chkReadErr(n, err, fd)
break
}
上面代码段是从netFD的Read方法中摘取,重点关注这个for循环中的syscall.Read调用的错误处理。当有错误发生的时候,会检查这个错误是否是syscall.EAGAIN,如果是,则调用WaitRead将当前读这个fd的goroutine给park住,直到这个fd上的读事件再次发生为止。当这个socket上有新数据到来的时候,WaitRead调用返回,继续for循环的执行。这样的实现,就让调用netFD的Read的地方变成了同步“阻塞”方式编程,不再是异步非阻塞的编程方式了。netFD的Write方法和Read的实现原理是一样的,都是在碰到EAGAIN错误的时候将当前goroutine给park住直到socket再次可写为止。
本文只是将网络库的底层实现给大体上引导了一遍,知道底层代码大概实现在什么地方,方便结合源码深入理解。Go语言中的高并发、同步阻塞方式编程的关键其实是”goroutine和调度器”,针对网络IO的时候,我们需要知道EAGAIN这个非常关键的调度点,掌握了这个调度点,即使没有调度器,自己也可以在epoll的基础上配合协程等用户态线程实现网络IO操作的调度,达到同步阻塞编程的目的。
最后,为什么需要同步阻塞的方式编程?只有看多、写多了异步非阻塞代码的时候才能够深切体会到这个问题。真正的高大上绝对不是——“别人不会,我会;别人写不出来,我写得出来。”

http://ju.outofmemory.cn/entry/168649
本文分析了Golang的socket文件描述符和goroutine阻塞调度的原理。代码中大部分是Go代码,小部分是汇编代码。完整理解本文需要Go语言知识,并且用Golang写过网络程序。更重要的是,需要提前理解goroutine的调度原理。
1. TCP的连接对象:
连接对象:
在net.go中有一个名为Conn的接口,提供了对于连接的读写和其他操作:
type Conn interface {
    Read(b []byte) (n int, err error)
    Write(b []byte) (n int, err error)
    Close() error
    LocalAddr() Addr
    RemoteAddr() Addr
    SetReadDeadline(t time.Time) error
    SetWriteDeadline(t time.Time) error
}
这个接口就是对下面的结构体conn的抽象。conn结构体包含了对连接的读写和其他操作:
type conn struct {
    fd *netFD
}
从连接读取数据:
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    return c.fd.Read(b)
}
向连接写入数据:
// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    return c.fd.Write(b)
}
关闭连接:
// Close closes the connection.
func (c *conn) Close() error {
    if !c.ok() {
        return syscall.EINVAL
    }
    return c.fd.Close()
}
设置读写超时:
// SetDeadline implements the Conn SetDeadline method.
func (c *conn) SetDeadline(t time.Time) error {
    if !c.ok() {
        return syscall.EINVAL
    }
    return c.fd.setDeadline(t)
}
// SetReadDeadline implements the Conn SetReadDeadline method.
func (c *conn) SetReadDeadline(t time.Time) error {
    if !c.ok() {
        return syscall.EINVAL
    }
    return c.fd.setReadDeadline(t)
}
// SetWriteDeadline implements the Conn SetWriteDeadline method.
func (c *conn) SetWriteDeadline(t time.Time) error {
    if !c.ok() {
        return syscall.EINVAL
    }
    return c.fd.setWriteDeadline(t)
}
可以看到,对连接的所有操作,都体现在对*netFD的操作上。我们继续跟踪c.fd.Read()函数.
2.文件描述符
net/fd_unix.go:
网络连接的文件描述符:
// Network file descriptor.
type netFD struct {
    // locking/lifetime of sysfd + serialize access to Read and Write methods
    fdmu fdMutex
    // immutable until Close
    sysfd       int
    family      int
    sotype      int
    isConnected bool
    net         string
    laddr       Addr
    raddr       Addr
    // wait server
    pd pollDesc
}
文件描述符读取数据:
func (fd *netFD) Read(p []byte) (n int, err error) {
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    if err := fd.pd.PrepareRead(); err != nil {
        return 0, &OpError{"read", fd.net, fd.raddr, err}
    }
    // 调用system call,循环从fd.sysfd读取数据
    for {
        // 系统调用Read读取数据
        n, err = syscall.Read(int(fd.sysfd), p)
        // 如果发生错误,则需要处理
        // 并且只处理EAGAIN类型的错误,其他错误一律返回给调用者
        if err != nil {
            n = 0
            // 对于非阻塞的网络连接的文件描述符,如果错误是EAGAIN
            // 说明Socket的缓冲区为空,未读取到任何数据
            // 则调用fd.pd.WaitRead,
            if err == syscall.EAGAIN {
                if err = fd.pd.WaitRead(); err == nil {
                    continue
                }
            }
        }
        err = chkReadErr(n, err, fd)
        break
    }
    if err != nil && err != io.EOF {
        err = &OpError{"read", fd.net, fd.raddr, err}
    }
    return
}
网络轮询器
网络轮询器是Golang中针对每个socket文件描述符建立的轮询机制。 此处的轮询并不是一般意义上的轮询,而是Golang的runtime在调度goroutine或者GC完成之后或者指定时间之内,调用epoll_wait获取所有产生IO事件的socket文件描述符。当然在runtime轮询之前,需要将socket文件描述符和当前goroutine的相关信息加入epoll维护的数据结构中,并挂起当前goroutine,当IO就绪后,通过epoll返回的文件描述符和其中附带的goroutine的信息,重新恢复当前goroutine的执行。
// Integrated network poller (platform-independent part).
// 网络轮询器(平台独立部分)
// A particular implementation (epoll/kqueue) must define the following functions:
// 实际的实现(epoll/kqueue)必须定义以下函数:
// func netpollinit()           // to initialize the poller,初始化轮询器
// func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications, 为fd和pd启动边缘触发通知
// and associate fd with pd.
// 一个实现必须调用下面的函数,用来指示pd已经准备好
// An implementation must call the following function to denote that the pd is ready.
// func netpollready(gpp **g, pd *pollDesc, mode int32)
// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
// goroutines respectively. The semaphore can be in the following states:
// pollDesc包含了2个二进制的信号,分别负责读写goroutine的暂停.
// 信号可能处于下面的状态:
// pdReady - IO就绪通知被挂起;
//           一个goroutine将次状态置为nil来消费一个通知。
// pdReady - io readiness notification is pending;
//           a goroutine consumes the notification by changing the state to nil.
// pdWait - 一个goroutine准备暂停在信号上,但是还没有完成暂停。
// 这个goroutine通过把这个状态改变为G指针去提交这个暂停动作。
// 或者,替代性的,并行的其他通知将状态改变为READY.
// 或者,替代性的,并行的超时/关闭会将次状态变为nil
// pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
//          the goroutine commits to park by changing the state to G pointer,
//          or, alternatively, concurrent io notification changes the state to READY,
//          or, alternatively, concurrent timeout/close changes the state to nil.
// G指针 - 阻塞在信号上的goroutine
// IO通知或者超时/关闭会分别将此状态置为READY或者nil.
// G pointer - the goroutine is blocked on the semaphore;
//             io notification or timeout/close changes the state to READY or nil respectively
//             and unparks the goroutine.
// nil - nothing of the above.
const (
    pdReady uintptr = 1
    pdWait  uintptr = 2
)
网络轮询器的数据结构如下:
// Network poller descriptor.
// 网络轮询器描述符
type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock
    // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
    // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
    // in a lock-free way by all operations.
    // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    // that will blow up when GC starts moving objects.
    //
    // lock锁对象保护了pollOpen, pollSetDeadline, pollUnblock和deadlineimpl操作。
    // 而这些操作又完全包含了对seq, rt, tw变量。
    // fd在PollDesc整个生命过程中都是一个常量。
    // 处理pollReset, pollWait, pollWaitCanceled和runtime.netpollready(IO就绪通知)不需要用到锁。
    // 所以closing, rg, rd, wg和wd的所有操作都是一个无锁的操作。
    lock    mutex // protectes the following fields
    fd      uintptr
    closing bool
    seq     uintptr        // protects from stale timers and ready notifications
    rg      uintptr        // pdReady, pdWait, G waiting for read or nil
    rt      timer          // read deadline timer (set if rt.f != nil)
    rd      int64          // read deadline
    wg      uintptr        // pdReady, pdWait, G waiting for write or nil
    wt      timer          // write deadline timer
    wd      int64          // write deadline
    user    unsafe.Pointer // user settable cookie
}
将当前goroutine设置为阻塞在fd上:
pd.WaitRead():
func (pd *pollDesc) WaitRead() error {
    return pd.Wait('r')
}
func (pd *pollDesc) Wait(mode int) error {
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res)
}
res是runtime_pollWait函数返回的结果,由conevertErr函数包装后返回:
func convertErr(res int) error {
    switch res {
    case 0:
        return nil
    case 1:
        return errClosing
    case 2:
        return errTimeout
    }
    println("unreachable: ", res)
    panic("unreachable")
}
函数返回0,表示IO已经准备好,返回nil。
返回1,说明连接已关闭,应该放回errClosing。
返回2,说明对IO进行的操作发生超时,应该返回errTimeout。
runtime_pollWait会调用runtime/thunk.s中的函数:
TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0
    JMP runtime·netpollWait(SB)
这是一个包装函数,没有参数,直接跳转到runtime/netpoll.go中的函数netpollWait:
func netpollWait(pd *pollDesc, mode int) int {
    // 检查pd的状态是否异常
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    // As for now only Solaris uses level-triggered IO.
    if GOOS == "solaris" {
        onM(func() {
            netpollarm(pd, mode)
        })
    }
    // 循环中检查pd的状态是不是已经被设置为pdReady
    // 即检查IO是不是已经就绪
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return 0
}
netpollcheckerr函数检查pd是否出现异常:
// 检查pd的异常
func netpollcheckerr(pd *pollDesc, mode int32) int {
    // 是否已经关闭
    if pd.closing {
        return 1 // errClosing
    }
    // 当读写状态下,deadline小于0,表示pd已经过了超时时间
    if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
        return 2 // errTimeout
    }
    // 正常情况返回0
    return 0
}
netpollblock():
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// 这个函数被netpollWait循环调用
// 返回true说明IO已经准备好,返回false说明IO操作已经超时或者已经关闭
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    // 获取pd的rg
    gpp := &pd.rg
    // 如果模式是w,则获取pd的wg
    if mode == 'w' {
        gpp = &pd.wg
    }
    // set the gpp semaphore to WAIT
    // 在循环中设置pd的gpp为pdWait
    // 因为casuintptr是自旋锁,所以需要在循环中调用
    for {
        // 如果在循环中发现IO已经准备好(pg的rg或者wg为pdReady状态)
        // 则设置rg/wg为0,返回true
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        // 每次netpollblock执行完毕之后,gpp重置为0
        // 非0表示重复wait
        if old != 0 {
            gothrow("netpollblock: double wait")
        }
        // CAS操作改变gpp为pdWait
        if casuintptr(gpp, 0, pdWait) {
            break
        }
    }
    // need to recheck error states after setting gpp to WAIT
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
    //
    // 当设置gpp为pdWait状态后,重新检查gpp的状态
    // 这是必要的,因为runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl会做相反的操作
    // 如果状态正常则挂起当前的goroutine
    //
    // 当netpollcheckerr检查io出现超时或者错误,waitio为true可用于等待ioReady
    // 否则当waitio为false, 且io不出现错误或者超时才会挂起当前goroutine
    if waitio || netpollcheckerr(pd, mode) == 0 {
        // 解锁函数,设置gpp为pdWait,如果设置不成功
        // 说明已经是发生其他事件,可以让g继续运行,而不是挂起当前g
        f := netpollblockcommit
        // 尝试挂起当前g
        gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
    }
    // be careful to not lose concurrent READY notification
    old := xchguintptr(gpp, 0)
    if old > pdWait {
        gothrow("netpollblock: corrupted state")
    }
    return old == pdReady
}
runtime/proc.go: gopark():
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// 将当前goroutine置为waiting状态,然后调用unlockf
func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) {
    // 获取当前M
    mp := acquirem()
    // 获取当前G
    gp := mp.curg
    // 获取G的状态
    status := readgstatus(gp)
    // 如果不是_Grunning或者_Gscanrunning,则报错
    if status != _Grunning && status != _Gscanrunning {
        gothrow("gopark: bad g status")
    }
    // 设置lock和unlockf
    mp.waitlock = lock
    mp.waitunlockf = unlockf
    gp.waitreason = reason
    releasem(mp)
    // can't do anything that might move the G between Ms here.
    // 在m->g0这个栈上调用park_m,而不是当前g的栈
    mcall(park_m)
}
mcall函数是一段汇编,在m->g0的栈上调用park_m,而不是在当前goroutine的栈上。mcall的功能分两部分,第一部分保存当前G的PC/SP到G的gobuf的pc/sp字段,第二部分调用park_m函数:
// func mcall(fn func(*g))
// Switch to m->g0's stack, call fn(g).
// Fn must never return.  It should gogo(&g->sched)
// to keep running g.
TEXT runtime·mcall(SB), NOSPLIT, $0-8
    // 将需要执行的函数保存在DI
    MOVQ    fn+0(FP), DI
    // 将M的TLS存放在CX
    get_tls(CX)
    // 将G对象存放在AX
    MOVQ    g(CX), AX   // save state in g->sched
    // 将调用者的PC存放在BX
    MOVQ    0(SP), BX   // caller's PC
    // 将调用者的PC保存到g->sched.pc
    MOVQ    BX, (g_sched+gobuf_pc)(AX)
    // 第一个参数的地址,即栈顶的地址,保存到BX
    LEAQ    fn+0(FP), BX    // caller's SP
    // 保存SP的地址到g->sched.sp
    MOVQ    BX, (g_sched+gobuf_sp)(AX)
    // 将g对象保存到g->sched->g
    MOVQ    AX, (g_sched+gobuf_g)(AX)
    // switch to m->g0 & its stack, call fn
    // 将g对象指针保存到BX
    MOVQ    g(CX), BX
    // 将g->m保存到BX
    MOVQ    g_m(BX), BX
    // 将m->g0保存到SI
    MOVQ    m_g0(BX), SI
    CMPQ    SI, AX  // if g == m->g0 call badmcall
    JNE 3(PC)
    MOVQ    $runtime·badmcall(SB), AX
    JMP AX
    // 将m->g0保存到g
    MOVQ    SI, g(CX)   // g = m->g0
    // 将g->sched.sp恢复到SP寄存器
    // 即使用g0的栈
    MOVQ    (g_sched+gobuf_sp)(SI), SP  // sp = m->g0->sched.sp
    // AX进栈
    PUSHQ   AX
    MOVQ    DI, DX
    // 将fn的地址复制到DI
    MOVQ    0(DI), DI
    // 调用函数
    CALL    DI
    // AX出栈
    POPQ    AX
    MOVQ    $runtime·badmcall2(SB), AX
    JMP AX
    RET
park_m函数的功能分为三部分,第一部分让当前G和当前M脱离关系,第二部分是调用解锁函数,这里是调用netpoll.go源文件中的netpollblockcommit函数:
// runtime·park continuation on g0.
void
runtime·park_m(G *gp)
{
    bool ok;
    // 设置当前g为Gwaiting状态
    runtime·casgstatus(gp, Grunning, Gwaiting);
    // 让当前g和m脱离关系
    dropg();
    if(g->m->waitunlockf) {
        ok = g->m->waitunlockf(gp, g->m->waitlock);
        g->m->waitunlockf = nil;
        g->m->waitlock = nil;
        // 返回0为false,非0为true
        // 0说明g->m->waitlock发生了变化,即不是在gopark是设置的(pdWait)
        // 说明了脱离了WAIT状态,应该设置为Grunnable,并执行g
        if(!ok) {
            runtime·casgstatus(gp, Gwaiting, Grunnable);
            execute(gp);  // Schedule it back, never returns.
        }
    }
    // 这里是调度当前m继续执行其他g
    // 而不是上面执行execute
    schedule();
}
netpollblockcommit函数,设置gpp为pdWait,设置成功返回1,否则返回0。1为true,0为false:
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    return casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
}
到这里当前goroutine对socket文件描述符的等待IO继续的行为已经完成。过程中首先尽早尝试判断IO是否已经就绪,如果未就绪则挂起当前goroutine,挂起之后再次判断IO是否就绪,如果还未就绪则调度当前M运行其他G。如果是在调度goroutine之前IO已经就绪,则不会使当前goroutine进入调度队列,会直接运行刚才挂起的G。否则当前goroutine会进入调度队列。
接下来是等待runtime将其唤醒。runtime在执行findrunnablequeue、starttheworld,sysmon函数时,都会调用netpoll_epoll.go中的netpoll函数,寻找到IO就绪的socket文件描述符,并找到这些socket文件描述符对应的轮询器中附带的信息,根据这些信息将之前等待这些socket文件描述符就绪的goroutine状态修改为Grunnable。在以上函数中,执行完netpoll之后,会找到一个就绪的goroutine列表,接下来将就绪的goroutine加入到调度队列中,等待调度运行。
在netpoll_epoll.go中的netpoll函数中,epoll_wait函数返回N个发生事件的文件描述符对应的epollevent,接着对于每个event使用其data属性,将event.data转换为*pollDesc类型,再调用netpoll.go中的netpollready函数,将*pollDesc类型中的G数据类型去除,并附加到netpoll函数的调用者传递的G链表中:
// 将ev.data转换为*pollDesc类型
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
// 调用netpollready将取出pd中保存的G,并添加到链表中
netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
所以runtime在执行findrunnablequeue、starttheworld,sysmon函数中会执行netpoll函数,并返回N个goroutine。这些goroutine期待的网络事件已经发生,runtime会将这些goroutine放入到当前P的可运行队列中,接下来调度它们并运行。
posted on 2017-06-02 11:12 思月行云 阅读(810) 评论(0)  编辑 收藏 引用 所属分类: Golang

只有注册用户登录后才能发表评论。
【推荐】超50万行VC++源码: 大型组态工控、电力仿真CAD与GIS源码库
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理