nbio原理解析
之前更新的一系列,好久没更新了,差点烂尾,我不允许这样的事情在我身上发生(虽然已经烂尾好几次了😭)。
在上一篇文章中,我们探讨了基于 epoll 的 Go Netpoll 框架的早期实现——evio。我们还指出了它存在的一些问题。在本篇文章中,我们将继续深入分析另一个高性能的网络编程框架:nbio。
nbio项目里也包含了在nbio之上构建的nbhttp,这个不在我们讨论范围。
nbio同样采用了经典的Reactor模式,事实上,Go语言中的许多异步网络框架都是基于这种模式设计的。
老规矩,先运行nbio程序代码,
Server:
package main
import (
"fmt"
"github.com/lesismal/nbio"
)
func main() {
g := nbio.NewGopher(nbio.Config{
Network: "tcp",
Addrs: []string{":8888"},
MaxWriteBufferSize: 6 * 1024 * 1024,
})
g.OnData(func(c *nbio.Conn, data []byte) {
c.Write(append([]byte{}, data...))
})
err := g.Start()
if err != nil {
fmt.Printf("nbio.Start failed: %v\n", err)
return
}
defer g.Stop()
g.Wait()
}
使用nbio.NewGopher()函数创建一个新的Engine实例。传入nbio.Config结构体来配置 Engine 实例,包括:
- Network: 使用的网络类型,这里是tcp。
- Addrs: 服务器要监听的地址和端口,这里是 “:8888”(即监听本机的8888端口)。
- MaxWriteBufferSize: 写缓冲区的最大大小,这里设置为 6MB。
其他配置可以自行查看。然后使用g.OnData()方法为 Engine实例注册一个数据接收回调函数。这个回调函数在收到数据时被调用。回调函数接收两个参数:连接对象c和收到的数据 data。在回调函数内部,我们使用c.Write()方法将收到的数据原样写回给客户端。
Client:
package main
import (
"bytes"
"context"
"fmt"
"math/rand"
"time"
"github.com/lesismal/nbio"
"github.com/lesismal/nbio/logging"
)
func main() {
var (
ret []byte
buf = make([]byte, 1024*1024*4)
addr = "localhost:8888"
ctx, _ = context.WithTimeout(context.Background(), 60*time.Second)
)
logging.SetLevel(logging.LevelInfo)
rand.Read(buf)
g := nbio.NewGopher(nbio.Config{})
done := make(chan int)
g.OnData(func(c *nbio.Conn, data []byte) {
ret = append(ret, data...)
if len(ret) == len(buf) {
if bytes.Equal(buf, ret) {
close(done)
}
}
})
err := g.Start()
if err != nil {
fmt.Printf("Start failed: %v\n", err)
}
defer g.Stop()
c, err := nbio.Dial("tcp", addr)
if err != nil {
fmt.Printf("Dial failed: %v\n", err)
}
g.AddConn(c)
c.Write(buf)
select {
case <-ctx.Done():
logging.Error("timeout")
case <-done:
logging.Info("success")
}
}
乍一看有点麻烦。其实是服务端和客户端共用了一套结构。
客户端通过nbio.Dial连接服务端,连接成功封装成nbio.Conn,这里的nbio.Conn是实现了标准库的net.Conn 接口的。最后AddConn这个conn。调用Write往服务端写数据,当服务端接收到数据后,Server端的处理逻辑是把数据原样发送给客户端,当客户端接收到数据一样OnData会被回调,上面OnData判断接受到的数据和发送的数据是不是长度一样,最后客户端主动关闭这个连接。
下面来看几个主要结构。
type Engine struct {
//..
sync.WaitGroup
//..
mux sync.Mutex
wgConn sync.WaitGroup
network string
addrs []string
listen func(network, addr string) (net.Listener, error)
pollerNum int
readBufferSize int
maxWriteBufferSize int
maxConnReadTimesPerEventLoop int
udpReadTimeout time.Duration
epollMod uint32
/..
connsStd map[*Conn]struct{}
connsUnix []*Conn
listeners []*poller
pollers []*poller
onOpen func(c *Conn)
onClose func(c *Conn, err error)
onRead func(c *Conn)
onData func(c *Conn, data []byte)
onReadBufferAlloc func(c *Conn) []byte
onReadBufferFree func(c *Conn, buffer []byte)
//...
}
Engine本质上就是核心管理器。会管理所有的listener poller 和worker poller。
这两种poller有什么区别吗?
区别在职责上。
listener poller只负责accept新的连接,当一个新的客户端conn到来时,会从pollers挑选一个worker poller,然后把conn加入到对应的worker poller,之后worker poller负责处理此conn的读写事件。
所以当我们启动程序的时候,如果只监听一个地址的情况下,那么程序的poll数= 1(listener poller) + pollerNum。
从上面的字段也可以看出,你可以自定义一些配置和回调。比如你可以设置当新连接到来时的回调函数onOpen,也可以设置一个conn数据到来时的回调函数onData等。
type Conn struct {
mux sync.Mutex
p *poller
fd int
//...
writeBuffer []byte
typ ConnType
closed bool
isWAdded bool
closeErr error
lAddr net.Addr
rAddr net.Addr
ReadBuffer []byte
//...
DataHandler func(c *Conn, data []byte)
}
Conn结构体,用于表示一个网络连接。一个conn只属于一个poller。对应的writeBuffer:当数据一次没写完时,剩下的先存在writeBuffer,等待下次可写事件到来继续写入。
type poller struct {
g *Engine
epfd int
evtfd int
index int
shutdown bool
listener net.Listener
isListener bool
unixSockAddr string
ReadBuffer []byte
pollType string
}
至于poller结构,这里就是一个抽象的概念,用于管理底层的多路复用 I/O(如linux的epoll、darwin上的kqueue 等)
注意这里的pollType,nbio默认epoll采用的是水平触发(LT),当然用户也可以设置成边缘触发(ET)。
介绍完基本的结构,接下来进入代码的流程。
上面服务端的代码,当你调用Start启动程序,
func (g *Engine) Start() error {
//....
switch g.network {
//第一部分 初始化listener
case "unix", "tcp", "tcp4", "tcp6":
for i := range g.addrs {
// listener poller
ln, err := newPoller(g, true, i)
if err != nil {
for j := 0; j < i; j++ {
g.listeners[j].stop()
}
return err
}
g.addrs[i] = ln.listener.Addr().String()
g.listeners = append(g.listeners, ln)
}
//.....
//第二部分 初始化一定数量poller
for i := 0; i < g.pollerNum; i++ {
// rw poller
p, err := newPoller(g, false, i)
if err != nil {
for j := 0; j < len(g.listeners); j++ {
g.listeners[j].stop()
}
for j := 0; j < i; j++ {
g.pollers[j].stop()
}
return err
}
g.pollers[i] = p
}
//第三部分 启动所有的worker poller
for i := 0; i < g.pollerNum; i++ {
g.pollers[i].ReadBuffer = make([]byte, g.readBufferSize)
g.Add(1)
go g.pollers[i].start()
}
// 第四部分 启动所有的listenr
for _, l := range g.listeners {
g.Add(1)
go l.start()
}
//...忽略udp
// ....
}
代码还是易懂的,整体看就四个部分。
第一部分:初始化 listener
根据 g.network
的值(如 “unix”, “tcp”, “tcp4”, “tcp6”),为每个要监听的地址(g.addrs
)创建一个新的 poller
。这里的 poller
主要用于管理监听套接字上的事件。如果创建 poller
时出错,将停止之前创建的所有监听器并返回错误。
第二部分:初始化一定数量的 poller
根据pollerNum创建对应个数的worker poller。这些 poller
用于处理已连接套接字上的读/写事件。如果在创建过程中遇到错误,将停止所有监听器和之前创建的工作 poller
,然后返回错误。
第三部分:启动所有的 worker poller
为每个工作 poller
分配一个读缓冲区(由 g.readBufferSize
决定大小),并发地启动这些 poller
。
第四部分:启动所有的 listener
启动所有之前创建的监听器。开始监听对应地址的连接请求。
至于poller的启动,
func (p *poller) start() {
defer p.g.Done()
//...
if p.isListener {
p.acceptorLoop()
} else {
defer func() {
syscall.Close(p.epfd)
syscall.Close(p.evtfd)
}()
p.readWriteLoop()
}
}
分为两种,如果是一个listener poller,
func (p *poller) acceptorLoop() {
//如果想要当前g不被调度到其他的操作线程。
if p.g.lockListener {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}
p.shutdown = false
for !p.shutdown {
conn, err := p.listener.Accept()
if err == nil {
var c *Conn
c, err = NBConn(conn)
if err != nil {
conn.Close()
continue
}
//
p.g.pollers[c.Hash()%len(p.g.pollers)].addConn(c)
} else {
var ne net.Error
if ok := errors.As(err, &ne); ok && ne.Timeout() {
logging.Error("NBIO[%v][%v_%v] Accept failed: temporary error, retrying...", p.g.Name, p.pollType, p.index)
time.Sleep(time.Second / 20)
} else {
if !p.shutdown {
logging.Error("NBIO[%v][%v_%v] Accept failed: %v, exit...", p.g.Name, p.pollType, p.index, err)
}
break
}
}
}
}
listener poller 就是等待新连接,然后通过NBConn封装成nbio conn结构,最后通过取模操作获取其中一个woker poller。把连接加入到对应的poller中。
func (p *poller) addConn(c *Conn) {
c.p = p
if c.typ != ConnTypeUDPServer {
p.g.onOpen(c)
}
fd := c.fd
p.g.connsUnix[fd] = c
err := p.addRead(fd)
if err != nil {
p.g.connsUnix[fd] = nil
c.closeWithError(err)
logging.Error("[%v] add read event failed: %v", c.fd, err)
}
}
}
这里一个有趣的设计,在管理conns上,结构是slice,作者直接使用的conn的fd来作为下标。
这样还是有好处的,
- 连接超多的情况下,GC的时候负担会比map小。
- 能防止串号问题。
最后通过调用addRead把对应的conn fd加入到epoll。
func (p *poller) addRead(fd int) error {
switch p.g.epollMod {
case EPOLLET:
return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN | syscall.EPOLLET})
default:
return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN})
}
}
这里没有注册写事件是合理的,因为在新连接上还没有收到任何数据,所以暂时没有需要发送的数据。这种做法可以避免一些不必要的系统调用,从而提高程序的性能。
如果是worker poller的启动,它的工作就是等待加入的那些conns的事件到来,进行对应的处理。
func (p *poller) readWriteLoop() {
//....
msec := -1
events := make([]syscall.EpollEvent, 1024)
// ......
for !p.shutdown {
n, err := syscall.EpollWait(p.epfd, events, msec)
if err != nil && !errors.Is(err, syscall.EINTR) {
return
}
if n <= 0 {
msec = -1
// runtime.Gosched()
continue
}
msec = 20
// 遍历事件
for _, ev := range events[:n] {
fd := int(ev.Fd)
switch fd {
case p.evtfd:
default:
c := p.getConn(fd)
if c != nil {
if ev.Events&epollEventsError != 0 {
c.closeWithError(io.EOF)
continue
}
// 说明可写 刷数据
if ev.Events&epollEventsWrite != 0 {
c.flush()
}
//读事件
if ev.Events&epollEventsRead != 0 {
if p.g.onRead == nil {
for i := 0; i < p.g.maxConnReadTimesPerEventLoop; i++ {
buffer := p.g.borrow(c)
rc, n, err := c.ReadAndGetConn(buffer)
if n > 0 {
p.g.onData(rc, buffer[:n])
}
p.g.payback(c, buffer)
// ...
if n < len(buffer) {
break
}
}
} else {
p.g.onRead(c)
}
}
} else {
syscall.Close(fd)
// p.deleteEvent(fd)
}
}
}
}
}
这段代码也很好理解。等待事件到来,遍历对应的事件列表,判断事件类型,相对应的处理。
func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error)
EpollWait中只有msec是可以用户动态修改的,通常情况下,我们主动调用EpollWait都会设置msec=-1,msce=-1会使得函数一直等待,直到至少有一个事件发生,否则的话一直阻塞。这种方法在事件发生较少的情况下非常有用,因为它可以最大限度地减少 CPU占用率。
如果希望尽可能快速响应事件,可以将msec设置为 0。这将使 EpollWait立即返回,不等待任何事件。这种情况下,你的程序可能会更频繁地调用EpollWait,但能够在事件发生后立即处理它们。当然,这就会导致CPU占用率较高。
如果你的程序可以承受一定的延迟,并希望减少 CPU占用率,可以将msec设置为一个正数。这将使得EpollWait在指定的时间内等待事件。如果在这段时间内没有事件发生,函数将返回,你可以选择在稍后再次调用EpollWait。这种方法可以降低 CPU占用率,但可能会导致较长的响应时间。
nbio对应这个值的调整策略是:当事件数量大于0时,msec=20(这个20应该是作者测试后综合考量?)。
字节跳动的netpoll代码是这样的,如果事件数量大于0,会设置msec为0。如果事件小于等于0,设置msec=-1,然后调用Gosched()使得当前Goroutine主动让出P。
var msec = -1
for {
n, err = syscall.EpollWait(epfd, events, msec)
if n <= 0 {
msec = -1
runtime.Gosched()
continue
}
msec = 0
...
}
然而,nbio 中主动切换的代码已经被注释掉了。根据作者在 issue 中的解释,最初他参考了字节的方法加入了主动切换。
但在对 nbio 进行性能测试时,发现加入与不加入主动切换对性能没有明显区别,因此最终决定将其移除。
事件的处理部分,
如果是可读事件,你可以通过内置或者自定义的内存分配器来得到相应的buffer,然后调用ReadAndGetConn读取数据,而不需要每次都申请一遍buffer。
如果是可写事件的话,会调用flush把buffer里未发送的数据发送出去。
func (c *Conn) flush() error {
//.....
old := c.writeBuffer
n, err := c.doWrite(old)
if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
//.....
}
if n < 0 {
n = 0
}
left := len(old) - n
// 说明没写完,把剩下的存进writeBuffer下次写
if left > 0 {
if n > 0 {
c.writeBuffer = mempool.Malloc(left)
copy(c.writeBuffer, old[n:])
mempool.Free(old)
}
// c.modWrite()
} else {
mempool.Free(old)
c.writeBuffer = nil
if c.wTimer != nil {
c.wTimer.Stop()
c.wTimer = nil
}
// 说明写完了,先把conn重置为只有读事件
c.resetRead()
//...
}
c.mux.Unlock()
return nil
}
逻辑也很简单,写多少是多少,写不进去把剩下的重新放入writeBuffer,下轮epollWait触发再写。
如果写完了,那就没数据可写了,重置这个conn的事件为读事件。
主逻辑就差不多是这样了。
等等,我们一开始说一个新连接进来的时候,我们对一个连接只注册了读事件,没注册写事件,写事件是什么时候注册的?
当然是你调用conn.Write的时候,
g := nbio.NewGopher(nbio.Config{
Network: "tcp",
Addrs: []string{":8888"},
MaxWriteBufferSize: 6 * 1024 * 1024,
})
g.OnData(func(c *nbio.Conn, data []byte) {
c.Write(append([]byte{}, data...))
})
当conn的数据到来时,底层读完数据,会回调OnData函数,此时你可以调用Write向对端发送数据,
func (c *Conn) Write(b []byte) (int, error) {
//....
n, err := c.write(b)
if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
//.....
return n, err
}
if len(c.writeBuffer) == 0 {
if c.wTimer != nil {
c.wTimer.Stop()
c.wTimer = nil
}
} else {
// 说明还有数据没写完,加入写事件
c.modWrite()
}
//.....
return n, err
}
func (c *Conn) write(b []byte) (int, error) {
//...
if len(c.writeBuffer) == 0 {
n, err := c.doWrite(b)
if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
return n, err
}
//.....
left := len(b) - n
//还没写完,剩下的放入writeBuffer
if left > 0 && c.typ == ConnTypeTCP {
c.writeBuffer = mempool.Malloc(left)
copy(c.writeBuffer, b[n:])
c.modWrite()
}
return len(b), nil
}
// 如果本身writeBuffer还有数据没写入,那新的数据也append进来
c.writeBuffer = mempool.Append(c.writeBuffer, b...)
return len(b), nil
}
当数据没写完,就把剩余数据放入到writeBuffer,这样就会触发执行modWrite,就会把conn的写事件注册到epoll中。
总结
相较于evio,nbio不会出现惊群效应。
evio是通过不断无效的唤醒epoll,来达到逻辑的正确性。而nbio是尽可能的减少系统调用,减少无谓的开销。
易用性上,nbio实现了标准库net.Conn,同时很多设置可配置化,用户可以自由定制,自由度较高。
读写上会使用预先分配的buffer,提高应用性能。
总之,nbio是一款不错的高性能非阻塞的网络框架。