Fork me on GitHub
随笔 - 209  文章 - 13  trackbacks - 0
<2017年12月>
262728293012
3456789
10111213141516
17181920212223
24252627282930
31123456


专注即时通讯及网游服务端编程
------------------------------------
Openresty之lua-nginx-module
------------------------------------
本博收藏大部分文章为转载,并在文章开头给出了原文出处,如有再转,敬请保留相关信息,这是大家对原创作者劳动成果的自觉尊重!!如为您带来不便,请于本博下留言,谢谢配合。

常用链接

留言簿

随笔分类

随笔档案

相册

Awesome

Blog

Book

GitHub

Link

搜索

  •  

积分与排名

  • 积分 - 56529
  • 排名 - 327

最新评论

阅读排行榜

KCP-GO源码解析

概念

ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一.
RTO:Retransmission TimeOut
FEC:Forward Error Correction

kcp简介

kcp是一个基于udp实现快速、可靠、向前纠错的的协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发。查看官方文档kcp

kcp-go是用go实现了kcp协议的一个库,其实kcp类似tcp,协议的实现也很多参考tcp协议的实现,滑动窗口,快速重传,选择性重传,慢启动等。
kcp和tcp一样,也分客户端和监听端。

    +-+-+-+-+-+            +-+-+-+-+-+
    
|  Client |            |  Server |
    
+-+-+-+-+-+            +-+-+-+-+-+
        
|------ kcp data ------>|     
        
|<----- kcp data -------|    

kcp协议
layer model
+----------------------+
|      Session         |
+----------------------+
|      KCP(ARQ)        |
+----------------------+
|      FEC(OPTIONAL)   |
+----------------------+
|      CRYPTO(OPTIONAL)|
+----------------------+
|      UDP(Packet)     |
+----------------------+

KCP header

KCP Header Format

      4           1   1     2 (Byte)
+---+---+---+---+---+---+---+---+
|     conv      |cmd|frg|  wnd  |
+---+---+---+---+---+---+---+---+
|     ts        |     sn        |
+---+---+---+---+---+---+---+---+
|     una       |     len       |
+---+---+---+---+---+---+---+---+
|                               |
+             DATA              +
|                               |
+---+---+---+---+---+---+---+---+

代码结构
src/vendor/github.com/xtaci/kcp-go/
├── LICENSE
├── README.md
├── crypt.go    加解密实现
├── crypt_test.go
├── donate.png
├── fec.go      向前纠错实现
├── frame.png
├── kcp
-go.png
├── kcp.go      kcp协议实现
├── kcp_test.go
├── sess.go     会话管理实现
├── sess_test.go
├── snmp.go     数据统计实现
├── updater.go  任务调度实现
├── xor.go      xor封装
└── xor_test.go

着重研究两个文件kcp.gosess.go

kcp浅析

kcp是基于udp实现的,所有udp的实现这里不做介绍,kcp做的事情就是怎么封装udp的数据和怎么解析udp的数据,再加各种处理机制,为了重传,拥塞控制,纠错等。下面介绍kcp客户端和服务端整体实现的流程,只是大概介绍一下函数流,不做详细解析,详细解析看后面数据流的解析。

kcp client整体函数流

和tcp一样,kcp要连接服务端需要先拨号,但是和tcp有个很大的不同是,即使服务端没有启动,客户端一样可以拨号成功,因为实际上这里的拨号没有发送任何信息,而tcp在这里需要三次握手。

DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
    V
net.DialUDP(
"udp", nil, udpaddr)
    V
NewConn()
    V
newUDPSession() {初始化UDPSession}
    V
NewKCP() {初始化kcp}
    V
updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
    V
go sess.readLoop()
    V
go s.receiver(chPacket)
    V
s.kcpInput(data)
    V
s.fecDecoder.decodeBytes(data)
    V
s.kcp.Input(data, 
true, s.ackNoDelay)
    V
kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
    V
notifyReadEvent()

客户端大体的流程如上面所示,先Dial,建立udp连接,将这个连接封装成一个会话,然后启动一个go程,接收udp的消息。

kcp server整体函数流

ListenWithOptions() 

    V
net.ListenUDP()
    V
ServerConn()
    V
newFECDecoder()
    V
go l.monitor() {从chPacket接收udp数据,写入kcp}
    V
go l.receiver(chPacket) {从upd接收数据,并入队列}
    V
newUDPSession()
    V
updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
    V
s.kcpInput(data)`
    V
s.fecDecoder.decodeBytes(data)
    V
s.kcp.Input(data, 
true, s.ackNoDelay)
    V
kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
    V
notifyReadEvent()


服务端的大体流程如上图所示,先Listen,启动udp监听,接着用一个go程监控udp的数据包,负责将不同session的数据写入不同的udp连接,然后解析封装将数据交给上层。

kcp 数据流详细解析

不管是kcp的客户端还是服务端,他们都有io行为,就是读与写,我们只分析一个就好了,因为它们读写的实现是一样的,这里分析客户端的读与写。
kcp client 发送消息

s.Write(b []byte
    V
s.kcp.WaitSnd() {}
    V
s.kcp.Send(b) {将数据根据mss分段,并存在kcp.snd_queue}
     V
s.kcp.flush(
false) [flush data to output] {
    
if writeDelay==true {
        flush
    }
else{
        每隔`interval`时间flush一次
    }
}
     V
kcp.output(buffer, size) 
     V
s.output(buf)
     V
s.conn.WriteTo(ext, s.remote)
     V
s.conn..Conn.WriteTo(buf)

读写都是在sess.go文件中实现的,Write方法:

// Write implements net.Conn
func (s *UDPSession) Write(b []byte) (n int, err error) {
    
for {
        

        
// api flow control
        if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
            n 
= len(b)
            
for {
                
if len(b) <= int(s.kcp.mss) {
                    s.kcp.Send(b)
                    
break
                } 
else {
                    s.kcp.Send(b[:s.kcp.mss])
                    b 
= b[s.kcp.mss:]
                }
            }

            
if !s.writeDelay {
                s.kcp.flush(
false)
            }
            s.mu.Unlock()
            atomic.AddUint64(
&DefaultSnmp.BytesSent, uint64(n))
            
return n, nil
        }

        
        
// wait for write event or timeout
        select {
        
case <-s.chWriteEvent:
        
case <-c:
        
case <-s.die:
        }

        
if timeout != nil {
            timeout.Stop()
        }
    }
}

假设发送一个hello消息,Write方法会先判断发送窗口是否已满,满的话该函数阻塞,不满则kcp.Send(“hello”),而Send函数实现根据mss的值对数据分段,当然这里的发送的hello,长度太短,只分了一个段,并把它们插入发送的队列里。

func (kcp *KCP) Send(buffer []byteint {
    
    
for i := 0; i < count; i++ {
        var size 
int
        
if len(buffer) > int(kcp.mss) {
            size 
= int(kcp.mss)
        } 
else {
            size 
= len(buffer)
        }
        seg :
= kcp.newSegment(size)
        copy(seg.data, buffer[:size])
        
if kcp.stream == 0 { // message mode
            seg.frg = uint8(count - i - 1)
        } 
else { // stream mode
            seg.frg = 0
        }
        kcp.snd_queue 
= append(kcp.snd_queue, seg)
        buffer 
= buffer[size:]
    }
    
return 0
}

接着判断参数writeDelay,如果参数设置为false,则立马发送消息,否则需要任务调度后才会触发发送,发送消息是由flush函数实现的。

// flush pending data
func (kcp *KCP) flush(ackOnly bool) {
    var seg Segment
    seg.conv 
= kcp.conv
    seg.cmd 
= IKCP_CMD_ACK
    seg.wnd 
= kcp.wnd_unused()
    seg.una 
= kcp.rcv_nxt

    buffer :
= kcp.buffer
    
// flush acknowledges
    ptr := buffer
    
for i, ack := range kcp.acklist {
        size :
= len(buffer) - len(ptr)
        
if size+IKCP_OVERHEAD > int(kcp.mtu) {
            kcp.output(buffer, size)
            ptr 
= buffer
        }
        
// filter jitters caused by bufferbloat
        if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
            seg.sn, seg.ts 
= ack.sn, ack.ts
            ptr 
= seg.encode(ptr)

        }
    }
    kcp.acklist 
= kcp.acklist[0:0]

    
if ackOnly { // flash remain ack segments
        size := len(buffer) - len(ptr)
        
if size > 0 {
            kcp.output(buffer, size)
        }
        
return
    }

    
// probe window size (if remote window size equals zero)
    if kcp.rmt_wnd == 0 {
        current :
= currentMs()
        
if kcp.probe_wait == 0 {
            kcp.probe_wait 
= IKCP_PROBE_INIT
            kcp.ts_probe 
= current + kcp.probe_wait
        } 
else {
            
if _itimediff(current, kcp.ts_probe) >= 0 {
                
if kcp.probe_wait < IKCP_PROBE_INIT {
                    kcp.probe_wait 
= IKCP_PROBE_INIT
                }
                kcp.probe_wait 
+= kcp.probe_wait / 2
                
if kcp.probe_wait > IKCP_PROBE_LIMIT {
                    kcp.probe_wait 
= IKCP_PROBE_LIMIT
                }
                kcp.ts_probe 
= current + kcp.probe_wait
                kcp.probe 
|= IKCP_ASK_SEND
            }
        }
    } 
else {
        kcp.ts_probe 
= 0
        kcp.probe_wait 
= 0
    }

    
// flush window probing commands
    if (kcp.probe & IKCP_ASK_SEND) != 0 {
        seg.cmd 
= IKCP_CMD_WASK
        size :
= len(buffer) - len(ptr)
        
if size+IKCP_OVERHEAD > int(kcp.mtu) {
            kcp.output(buffer, size)
            ptr 
= buffer
        }
        ptr 
= seg.encode(ptr)
    }

    
// flush window probing commands
    if (kcp.probe & IKCP_ASK_TELL) != 0 {
        seg.cmd 
= IKCP_CMD_WINS
        size :
= len(buffer) - len(ptr)
        
if size+IKCP_OVERHEAD > int(kcp.mtu) {
            kcp.output(buffer, size)
            ptr 
= buffer
        }
        ptr 
= seg.encode(ptr)
    }

    kcp.probe 
= 0

    
// calculate window size
    cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
    
if kcp.nocwnd == 0 {
        cwnd 
= _imin_(kcp.cwnd, cwnd)
    }

    
// sliding window, controlled by snd_nxt && sna_una+cwnd
    newSegsCount := 0
    
for k := range kcp.snd_queue {
        
if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
            
break
        }
        newseg :
= kcp.snd_queue[k]
        newseg.conv 
= kcp.conv
        newseg.cmd 
= IKCP_CMD_PUSH
        newseg.sn 
= kcp.snd_nxt
        kcp.snd_buf 
= append(kcp.snd_buf, newseg)
        kcp.snd_nxt
++
        newSegsCount
++
        kcp.snd_queue[k].data 
= nil
    }
    
if newSegsCount > 0 {
        kcp.snd_queue 
= kcp.remove_front(kcp.snd_queue, newSegsCount)
    }

    
// calculate resent
    resent := uint32(kcp.fastresend)
    
if kcp.fastresend <= 0 {
        resent 
= 0xffffffff
    }

    
// check for retransmissions
    current := currentMs()
    var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
    
for k := range kcp.snd_buf {
        segment :
= &kcp.snd_buf[k]
        needsend :
= false
        
if segment.xmit == 0 { // initial transmit
            needsend = true
            segment.rto 
= kcp.rx_rto
            segment.resendts 
= current + segment.rto
        } 
else if _itimediff(current, segment.resendts) >= 0 { // RTO
            needsend = true
            
if kcp.nodelay == 0 {
                segment.rto 
+= kcp.rx_rto
            } 
else {
                segment.rto 
+= kcp.rx_rto / 2
            }
            segment.resendts 
= current + segment.rto
            lost
++
            lostSegs
++
        } 
else if segment.fastack >= resent { // fast retransmit
            needsend = true
            segment.fastack 
= 0
            segment.rto 
= kcp.rx_rto
            segment.resendts 
= current + segment.rto
            change
++
            fastRetransSegs
++
        } 
else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
            needsend = true
            segment.fastack 
= 0
            segment.rto 
= kcp.rx_rto
            segment.resendts 
= current + segment.rto
            change
++
            earlyRetransSegs
++
        }

        
if needsend {
            segment.xmit
++
            segment.ts 
= current
            segment.wnd 
= seg.wnd
            segment.una 
= seg.una

            size :
= len(buffer) - len(ptr)
            need :
= IKCP_OVERHEAD + len(segment.data)

            
if size+need > int(kcp.mtu) {
                kcp.output(buffer, size)
                current 
= currentMs() // time update for a blocking call
                ptr = buffer
            }

            ptr 
= segment.encode(ptr)
            copy(ptr, segment.data)
            ptr 
= ptr[len(segment.data):]

            
if segment.xmit >= kcp.dead_link {
                kcp.state 
= 0xFFFFFFFF
            }
        }
    }

    
// flash remain segments
    size := len(buffer) - len(ptr)
    
if size > 0 {
        kcp.output(buffer, size)
    }

    
// counter updates
    sum := lostSegs
    
if lostSegs > 0 {
        atomic.AddUint64(
&DefaultSnmp.LostSegs, lostSegs)
    }
    
if fastRetransSegs > 0 {
        atomic.AddUint64(
&DefaultSnmp.FastRetransSegs, fastRetransSegs)
        sum 
+= fastRetransSegs
    }
    
if earlyRetransSegs > 0 {
        atomic.AddUint64(
&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
        sum 
+= earlyRetransSegs
    }
    
if sum > 0 {
        atomic.AddUint64(
&DefaultSnmp.RetransSegs, sum)
    }

    
// update ssthresh
    
// rate halving, https://tools.ietf.org/html/rfc6937
    if change > 0 {
        inflight :
= kcp.snd_nxt - kcp.snd_una
        kcp.ssthresh 
= inflight / 2
        
if kcp.ssthresh < IKCP_THRESH_MIN {
            kcp.ssthresh 
= IKCP_THRESH_MIN
        }
        kcp.cwnd 
= kcp.ssthresh + resent
        kcp.incr 
= kcp.cwnd * kcp.mss
    }

    
// congestion control, https://tools.ietf.org/html/rfc5681
    if lost > 0 {
        kcp.ssthresh 
= cwnd / 2
        
if kcp.ssthresh < IKCP_THRESH_MIN {
            kcp.ssthresh 
= IKCP_THRESH_MIN
        }
        kcp.cwnd 
= 1
        kcp.incr 
= kcp.mss
    }

    
if kcp.cwnd < 1 {
        kcp.cwnd 
= 1
        kcp.incr 
= kcp.mss
    }
}

flush函数非常的重要,kcp的重要参数都是在调节这个函数的行为,这个函数只有一个参数ackOnly,意思就是只发送ack,如果ackOnly为true的话,该函数只遍历ack列表,然后发送,就完事了。 如果不是,也会发送真实数据。 在发送数据前先进行windSize探测,如果开启了拥塞控制nc=0,则每次发送前检测服务端的winsize,如果服务端的winsize变小了,自身的winsize也要更着变小,来避免拥塞。如果没有开启拥塞控制,就按设置的winsize进行数据发送。
接着循环每个段数据,并判断每个段数据的是否该重发,还有什么时候重发:
1. 如果这个段数据首次发送,则直接发送数据。 2. 如果这个段数据的当前时间大于它自身重发的时间,也就是RTO,则重传消息。 3. 如果这个段数据的ack丢失累计超过resent次数,则重传,也就是快速重传机制。这个resent参数由resend参数决定。 4. 如果这个段数据的ack有丢失且没有新的数据段,则触发ER,ER相关信息ER

最后通过kcp.output发送消息hello,output是个回调函数,函数的实体是sess.go的:

func (s *UDPSession) output(buf []byte) {
    var ecc [][]
byte

    
// extend buf's header space
    ext := buf
    
if s.headerSize > 0 {
        ext 
= s.ext[:s.headerSize+len(buf)]
        copy(ext[s.headerSize:], buf)
    }

    
// FEC stage
    if s.fecEncoder != nil {
        ecc 
= s.fecEncoder.Encode(ext)
    }

    
// encryption stage
    if s.block != nil {
        io.ReadFull(rand.Reader, ext[:nonceSize])
        checksum :
= crc32.ChecksumIEEE(ext[cryptHeaderSize:])
        binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
        s.block.Encrypt(ext, ext)

        
if ecc != nil {
            
for k := range ecc {
                io.ReadFull(rand.Reader, ecc[k][:nonceSize])
                checksum :
= crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
                binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
                s.block.Encrypt(ecc[k], ecc[k])
            }
        }
    }

    
// WriteTo kernel
    nbytes := 0
    npkts :
= 0
    
// if mrand.Intn(100) < 50 {
    for i := 0; i < s.dup+1; i++ {
        
if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
            nbytes 
+= n
            npkts
++
        }
    }
    
// }

    
if ecc != nil {
        
for k := range ecc {
            
if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
                nbytes 
+= n
                npkts
++
            }
        }
    }
    atomic.AddUint64(
&DefaultSnmp.OutPkts, uint64(npkts))
    atomic.AddUint64(
&DefaultSnmp.OutBytes, uint64(nbytes))
}

output函数才是真正的将数据写入内核中,在写入之前先进行了fec编码,fec编码器的实现是用了一个开源库github.com/klauspost/reedsolomon,编码以后的hello就不是和原来的hello一样了,至少多了几个字节。 fec编码器有两个重要的参数reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShardsparityShards,这两个参数决定了fec的冗余度,冗余度越大抗丢包性就越强。

kcp的任务调度器

其实这里任务调度器是一个很简单的实现,用一个全局变量updater来管理session,代码文件为updater.go。其中最主要的函数

func (h *updateHeap) updateTask() {
    var timer 
<-chan time.Time
    
for {
        select {
        
case <-timer:
        
case <-h.chWakeUp:
        }

        h.mu.Lock()
        hlen :
= h.Len()
        now :
= time.Now()
        
if hlen > 0 && now.After(h.entries[0].ts) {
            
for i := 0; i < hlen; i++ {
                entry :
= heap.Pop(h).(entry)
                
if now.After(entry.ts) {
                    entry.ts 
= now.Add(entry.s.update())
                    heap.Push(h, entry)
                } 
else {
                    heap.Push(h, entry)
                    
break
                }
            }
        }
        
if hlen > 0 {
            timer 
= time.After(h.entries[0].ts.Sub(now))
        }
        h.mu.Unlock()
    }
}

任务调度器实现了一个堆结构,每当有新的连接,session都会插入到这个堆里,接着for循环每隔interval时间,遍历这个堆,得到entry然后执行entry.s.update()。而entry.s.update()会执行s.kcp.flush(false)来发送数据。

总结

这里简单介绍了kcp的整体流程,详细介绍了发送数据的流程,但未介绍kcp接收数据的流程,其实在客户端发送数据后,服务端是需要返回ack的,而客户端也需要根据返回的ack来判断数据段是否需要重传还是在队列里清除该数据段。处理返回来的ack是在函数kcp.Input()函数实现的。具体详细流程下次再介绍。

posted on 2017-12-09 15:20 思月行云 阅读(136) 评论(0)  编辑 收藏 引用 所属分类: Golang

只有注册用户登录后才能发表评论。
【推荐】超50万行VC++源码: 大型组态工控、电力仿真CAD与GIS源码库
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理