evio原理解析~有彩蛋

之前分析过go自带的netpoll,以及自建的网络框架gnet。

当然这类框架还有:evio、gev、nbio、cloudwego/netpoll(字节的)。

为什么会出现这么多自建框架?

我觉得逃不过三点,

  • 自带的netpoll满足不了一些特殊场景。

  • 其他实现设计存在局限性,存在优化空间。

  • 程序员都喜欢自己造轮子。

另外,这类框架都是基于syscall epoll实现的事件驱动框架。主要区别我觉得在于,

  • 对连接conn的管理
  • 对读写数据管理

带着这些问题,我打算把这些框架都看一遍。学习里面优秀的设计以及对比他们的不同点,可以的话,做个整体的性能测试。

这几个框架中,evio是最早的开源实现,开源于2017年。

有意思的是,看到几篇文章说evio存在当loopWrite在内核缓冲区满,无法一次写入时,会出现写入数据丢失的bug。

仔细阅读了代码,evio并不存在这个bug。也不存在是作者后来修复了这个bug,而是evio本身不存在这个bug。

下面会说明。

原理解析

根据代码画了个简易架构图说明evio架构。

简单解释一下,evio启动的时候可以指定loops个数,即多少个epoll实例。同时可以启动多个监听地址,比如图中监听了两个端口。

程序会把每个Listener fd加入到每个epoll并注册这些fd的读事件。每个epoll会开启一个goroutine等待事件到来。

当客户端发起对应端口连接,程序会根据策略选择一个epoll,并把conn fd 也加入到此epoll并注册读写事件。

当一个conn fd读事件ready,那么对应的epoll会被唤醒,然后执行相应的操作。

以上就是整理的流程,接下来我们来深入一些细节。

在此之前,根据上面所描述的,需要先提几个问题,

  • 当一个新的客户端连接到来时,会发生什么?
  • 读写数据是如何流动的?
  • 同一个epoll里多个fd读写事件ready,程序是如何处理的?

看完下面,再回来回答这三个问题。

代码细节

运行一个简单demo,

package main

import (
	"fmt"
	"github.com/tidwall/evio"
	"log"
)

func main() {
	var events evio.Events
	events.NumLoops = 3
	events.Serving = func(srv evio.Server) (action evio.Action) {
		log.Printf("echo server started")
		return
	}
	events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
		fmt.Println("receiver data:", string(in))
		out = in
		return
	}
	log.Fatal(evio.Serve(events, "tcp://:8089", "tcp://:8088"))
}

我们指定NumLoops的数量是3,然后传入了两个地址。

上面还有两个闭包函数,当服务启动的时候会回调events.Serving函数,然后返回一个Action。

const (
	// None indicates that no action should occur following an event.
	None Action = iota
	// Detach detaches a connection. Not available for UDP connections.
	Detach
	// Close closes the connection.
	Close
	// Shutdown shutdowns the server.
	Shutdown
)

比如当你返回一个Shutdown的action,那么程序就直接退出了。

当有客户端数据到来时,回调events.Data函数,返回out和action,out表示要发送的数据。

最终调用 evio.Serve函数,传入两个地址,启动服务。

func Serve(events Events, addr ...string) error {
	var lns []*listener
	defer func() {
		for _, ln := range lns {
			ln.close()
		}
	}()
	for _, addr := range addr {
		var ln listener
		ln.network, ln.addr, ln.opts, _= parseAddr(addr)
		
		if ln.network == "unix" {
			os.RemoveAll(ln.addr)
		}
		var err error
		if ln.network == "udp" {
			if ln.opts.reusePort {
				ln.pconn, err = reuseportListenPacket(ln.network, ln.addr)
			} else {
				ln.pconn, err = net.ListenPacket(ln.network, ln.addr)
			}
		} else {
			if ln.opts.reusePort {
				ln.ln, err = reuseportListen(ln.network, ln.addr)
			} else {
				ln.ln, err = net.Listen(ln.network, ln.addr)
			}
		}
		if err != nil {
			return err
		}
		if ln.pconn != nil {
			ln.lnaddr = ln.pconn.LocalAddr()
		} else {
			ln.lnaddr = ln.ln.Addr()
		}
		lns = append(lns, &ln)
	}
	return serve(events, lns)
}

我删除了一些无关代码。

Serve 函数里面遍历传入的地址识别协议,执行对应listen操作。udp返回一个PacketConn,而tcp返回一个Listener。最终用自定义的listener统一表示。最终调用serve。

func serve(events Events, listeners []*listener) error {
	// figure out the correct number of loops/goroutines to use.
	numLoops := events.NumLoops
	if numLoops <= 0 {
		if numLoops == 0 {
			numLoops = 1
		} else {
			numLoops = runtime.NumCPU()
		}
	}

	s := &server{}
	s.events = events
	s.lns = listeners
	s.cond = sync.NewCond(&sync.Mutex{})
	s.balance = events.LoadBalance
	s.tch = make(chan time.Duration)

	//println("-- server starting")
	if s.events.Serving != nil {
		var svr Server
		svr.NumLoops = numLoops
		svr.Addrs = make([]net.Addr, len(listeners))
		for i, ln := range listeners {
			svr.Addrs[i] = ln.lnaddr
		}
		action := s.events.Serving(svr)
		switch action {
		case None:
		case Shutdown:
			return nil
		}
	}

	defer func() {
		// wait on a signal for shutdown
		s.waitForShutdown()

		// notify all loops to close by closing all listeners
		for _, l := range s.loops {
			l.poll.Trigger(errClosing)
		}

		// wait on all loops to complete reading events
		s.wg.Wait()

		// close loops and all outstanding connections
		for _, l := range s.loops {
			for _, c := range l.fdconns {
				loopCloseConn(s, l, c, nil)
			}
			l.poll.Close()
		}
		//println("-- server stopped")
	}()

	// create loops locally and bind the listeners.
	for i := 0; i < numLoops; i++ {
		l := &loop{
			idx:     i,
			poll:    internal.OpenPoll(),
			packet:  make([]byte, 0xFFFF),
			fdconns: make(map[int]*conn),
		}
		for _, ln := range listeners {
			l.poll.AddRead(ln.fd)
		}
		s.loops = append(s.loops, l)
	}
	// start loops in background
	s.wg.Add(len(s.loops))
	for _, l := range s.loops {
		go loopRun(s, l)
	}
	return nil
}

这个函数逻辑:

1.先初始化一个自定义server结构,确定负载均衡算法。

2.回调自定义的serving闭包函数。

3.根据numloops值,创建对应数量的epoll封装在自定义结构loop中。并把每一个listener对应的fd加入到每一个epoll同时注册fd的读事件,

for _, ln := range listeners {
			l.poll.AddRead(ln.fd)
		}

func (p *Poll) AddRead(fd int) {
	if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_ADD, fd,
		&syscall.EpollEvent{Fd: int32(fd),
			Events: syscall.EPOLLIN, //读事件
		},
	); err != nil {
		panic(err)
	}
}

4.遍历loops,每个loop都用一个g执行loopRun函数。

至于loopRun函数,

func loopRun(s *server, l *loop) {
	defer func() {
		//fmt.Println("-- loop stopped --", l.idx)
		s.signalShutdown()
		s.wg.Done()
	}()

	if l.idx == 0 && s.events.Tick != nil {
		go loopTicker(s, l)
	}

	//fmt.Println("-- loop started --", l.idx)
	l.poll.Wait(func(fd int, note interface{}) error {
		if fd == 0 {
			return loopNote(s, l, note)
		}
		c := l.fdconns[fd]
		switch {
		case c == nil:
			return loopAccept(s, l, fd)
		case !c.opened:
			return loopOpened(s, l, c)
		case len(c.out) > 0:
			return loopWrite(s, l, c)
		case c.action != None:
			return loopAction(s, l, c)
		default:
			return loopRead(s, l, c)
		}
	})
}

每个loop都会调用epoll.Wait函数阻塞等待事件到来,参数时一个闭包函数,每一个到达的事件都会回调此闭包执行相应操作。

func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
	events := make([]syscall.EpollEvent, 64)
	for {
    // 等待事件
		n, err := syscall.EpollWait(p.fd, events, 100)
		if err != nil && err != syscall.EINTR {
			return err
		}
		if err := p.notes.ForEach(func(note interface{}) error {
			return iter(0, note)
		}); err != nil {
			return err
		}
		for i := 0; i < n; i++ {
      // 正常的客户端fd
			if fd := int(events[i].Fd); fd != p.wfd {
				if err := iter(fd, nil); err != nil {
					return err
				}
			} else if fd == p.wfd {
				var data [8]byte
				syscall.Read(p.wfd, data[:])
			}
		}
	}
}

回到上一步,

l.poll.Wait(func(fd int, note interface{}) error {
		if fd == 0 {
			return loopNote(s, l, note)
		}
		c := l.fdconns[fd]
		switch {
		case c == nil:
			return loopAccept(s, l, fd)
		case !c.opened:
			return loopOpened(s, l, c)
		case len(c.out) > 0:
			return loopWrite(s, l, c)
		case c.action != None:
			return loopAction(s, l, c)
		default:
			return loopRead(s, l, c)
		}
	})

注意这里是每个loop都会调用自己的epoll.Wait。当对应的Listener来了一个新客户端连接,所有的epoll都会被“惊醒”,这就是惊群效应。

惊群效应(thundering herd)是指多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么它就会唤醒等待的所有进程(或者线程)

然后所有的loop都会执行loopAccept函数,

func loopAccept(s *server, l *loop, fd int) error {
   for i, ln := range s.lns {
      // 确实是哪个listener fd 事件
      if ln.fd == fd {
         if len(s.loops) > 1 {
            switch s.balance {
            case LeastConnections:
               n := atomic.LoadInt32(&l.count)
               for _, lp := range s.loops {
                  // 对比其他的loop,有比自己连接数少的,自己就不连接了
                  if lp.idx != l.idx {
                     if atomic.LoadInt32(&lp.count) < n {
                        return nil // do not accept
                     }
                  }
               }
            case RoundRobin:
               idx := int(atomic.LoadUintptr(&s.accepted)) % len(s.loops)
               //还没轮到我,也不能连接
               if idx != l.idx {
                  return nil // do not accept
               }
               atomic.AddUintptr(&s.accepted, 1)
            }
         }

         // udp
         if ln.pconn != nil {
            return loopUDPRead(s, l, i, fd)
         }

         nfd, sa, err := syscall.Accept(fd)
         if err != nil {
            if err == syscall.EAGAIN {
               return nil
            }
            return err
         }
         //设置 conn fd 为非阻塞
         if err := syscall.SetNonblock(nfd, true); err != nil {
            return err
         }
         c := &conn{fd: nfd, sa: sa, lnidx: i, loop: l}
         c.out = nil
         l.fdconns[c.fd] = c
         //把conn fd 注册到此epoll,且监听读写事件
         l.poll.AddReadWrite(c.fd)
         atomic.AddInt32(&l.count, 1)
         break
      }
   }
   return nil
}

loopAccept做了几件事:

1.确定就绪的fd是哪个listener

2.如果loops大于1,通过策略选择其中一个loop,包装conn fd且设置conn fd为非阻塞(思考下如果让conn fd保持阻塞状态,会影响到什么?)

3.最后把这个conn fd加入到当前epoll并且注册读写事件

因此当一个新的客户端连接到来时,会发生什么?

会产生惊群效应。

那如果是已存在的conn fd的可读事件,会发生惊群效应吗?

不会,因为一个conn fd只会加入到其中一个epoll中。

因为evio创建epoll的时候默认是水平触发LT(level-triggered),当加入的conn fd包含写事件时,如果此时内核写缓冲区空间未满,那么epoll会再次被唤醒。此时通过f.fdconns[fd]会找到对应的conn,由于连接初始化并未设置opened的直,因此会进入loopOpened函数。

func loopOpened(s *server, l *loop, c *conn) error {
	c.opened = true
	c.addrIndex = c.lnidx
	c.localAddr = s.lns[c.lnidx].lnaddr
	c.remoteAddr = internal.SockaddrToAddr(c.sa)
	if s.events.Opened != nil {
		out, opts, action := s.events.Opened(c)
		if len(out) > 0 {
			c.out = append([]byte{}, out...)
		}
		c.action = action
		c.reuse = opts.ReuseInputBuffer
		if opts.TCPKeepAlive > 0 {
			if _, ok := s.lns[c.lnidx].ln.(*net.TCPListener); ok {
				internal.SetKeepAlive(c.fd, int(opts.TCPKeepAlive/time.Second))
			}
		}
	}
	// 没有可写的数据,并且也未设置连接的action,重新修改此conn只有read事件
	if len(c.out) == 0 && c.action == None {
		l.poll.ModRead(c.fd)
	}
	return nil
}

无非是一些简单赋值操作,如果设置了Opened闭包函数,那么回调它。

最后如果out没有可发送的事件,那么就重新把此conn fd修改成读事件。

想象一下,如果在没有可写数据的情况下,加入epoll的conn fd注册含有写事件,那么只要内核写缓存区未满,此epoll会不断被唤醒,我称它是空转。

如果上面你设置了Opened闭包函数且最终action设置了值,那么就还是保持此conn fd的读写事件。

这时候再次唤醒的epoll会执行loopAction,

func loopAction(s *server, l *loop, c *conn) error {
   switch c.action {
   default:
      c.action = None
   case Close:
      return loopCloseConn(s, l, c, nil)
   case Shutdown:
      return errClosing
   case Detach:
      return loopDetachConn(s, l, c, nil)
   }
   if len(c.out) == 0 && c.action == None {
      l.poll.ModRead(c.fd)
   }
   return nil
}

上面代码很简单。

当client发送数据,epoll被唤醒,执行loopRead。

func loopRead(s *server, l *loop, c *conn) error {
   var in []byte
   // 读数据
   n, err := syscall.Read(c.fd, l.packet)
   if n == 0 || err != nil {
      if err == syscall.EAGAIN {
         return nil
      }
      return loopCloseConn(s, l, c, err)
   }
   in = l.packet[:n]
   if !c.reuse {
      in = append([]byte{}, in...)
   }
   // 回调data闭包函数
   if s.events.Data != nil {
      out, action := s.events.Data(c, in)
      c.action = action
      // 表示要发送的数据
      if len(out) > 0 {
         c.out = append(c.out[:0], out...)
      }
   }
   // 说明有数据要发送,重新注册读写事件
   if len(c.out) != 0 || c.action != None {
      l.poll.ModReadWrite(c.fd)
   }
   return nil
}

先读取数据,如果有数据要发送,又会修改conn fd为读写事件。当epoll再次被唤醒,且c.out不为空时,执行loopWrite。

func loopWrite(s *server, l *loop, c *conn) error {
	if s.events.PreWrite != nil {
		s.events.PreWrite()
	}
	n, err := syscall.Write(c.fd, c.out)
	if err != nil {
		if err == syscall.EAGAIN {
			return nil
		}
		return loopCloseConn(s, l, c, err)
	}
	// 说明一次就写完数据了
	if n == len(c.out) {
		// release the connection output page if it goes over page size,
		// otherwise keep reusing existing page.
		// 不让out无限增长空间
		if cap(c.out) > 4096 {
			c.out = nil
		} else {
			// 重置nil,复用
			c.out = c.out[:0]
		}
	} else {
		//没写完,留着后面没写的部分
		c.out = c.out[n:]
	}
	// 全写完了,重新调整为读事件
	if len(c.out) == 0 && c.action == None {
		l.poll.ModRead(c.fd)
	}
	return nil
}

如果一次写完数据,那么说明暂时没有可写的数据了,重新修改conn fd为读事件。

如果一次没写完,那么保留未写完的数据,下次epoll唤醒的时候继续写。

上面提到,一些文章提到,evio存在 loopWrite在内核缓冲区满,无法一次写入时,会出现写入数据丢失的bug。

给的理由是,当内核写缓冲区满了,可数据并未写完。此时另一个conn读事件ready,会执行loopRead。

loopRead有这么一段代码,

if s.events.Data != nil {
		out, action := s.events.Data(c, in)
		c.action = action
		// 表示要发送的数据
		if len(out) > 0 {
      // 之前这里是:c.out = append([]byte{}, out...),效果是一样的
			c.out = append(c.out[:0], out...)
		}
	}

那么此时之前未发送完的数据就会被覆盖,导致数据丢失。

但是我仔细看了代码,并不存在这个bug。

因为如果第一次没写完,假设此时同一个epoll下的另一个conn读事件ready,由于out还有未写完的数据,只会执行loopWrite分支, 并不会走到默认分支loopRead,也就不存在写数据被覆盖导致丢失的问题了。

有趣的是,这样的情况会导致空转。因为执行loopWrite逻辑,由于内核写缓冲区已满,导致写不进去数据,会出现syscall.EAGAIN直接返回。又因此时还有可读的数据没读,会不断唤醒epoll。

调用epoll_wait会陷入内核态,所以会导致不断的在用户态和内核态切换。直到写完数据,才能读其他conn数据。

到这里,核心的代码已经分析完了。

Evio存在的问题

惊群效应

串行化

从上面的分析可以看出,如果一个epoll有两个fd可读事件ready,那么第二个fd必须等第一个执行完毕,才开始执行。

换句话说,如果这个闭包函数里有外部依赖调用,第二个就得一直等。

不能在Data函数里用go func吗?

还真不能,要是这样的话又会涉及到数据并发问题,数据会发生错乱。

同一个epoll下的conn是共享数据结构的,如果使用异步,必然又涉及到锁的问题。

数据copy问题

evio采用的是同步处理buffer数据,直接通过syscall读写操作存在copy开销,这是cpu直接参与的。看字节的netpoll使用的zero-copy的技术,后面再看源码。

频繁唤醒epoll

evio会通过不断修改conn fd的事件来唤醒epoll,达到逻辑上的正确性。频繁唤醒的方式并不是很妥,这种方式是存在开销的。

这篇文章到这里就结束了,分析完go自建netpoll “鼻祖” evio,以及它存在局限性,就可以继续学习后续其他框架的设计了。



gonet

1391 Words

2023-01-06 19:23 +0800