• 8.4 Zinx-V0.8代码实现

    8.4 Zinx-V0.8代码实现

    好了,现在需要将消息队列和多任务worker机制集成到我们Zinx的中了。我们在Server的Start()方法中,在服务端Accept之前,启动Worker工作池。

    zinx/znet/server.go

    1. //开启网络服务
    2. func (s *Server) Start() {
    3. //...
    4. //开启一个go去做服务端Linster业务
    5. go func() {
    6. //0 启动worker工作池机制
    7. s.msgHandler.StartWorkerPool()
    8. //1 获取一个TCP的Addr
    9. addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
    10. if err != nil {
    11. fmt.Println("resolve tcp addr err: ", err)
    12. return
    13. }
    14. //...
    15. //...
    16. }
    17. }()
    18. }

    其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给Worker工作池进行处理。

    所以应该在Connection的StartReader()方法中修改:

    zinx/znet/connection.go

    1. /*
    2. 读消息Goroutine,用于从客户端中读取数据
    3. */
    4. func (c *Connection) StartReader() {
    5. fmt.Println("Reader Goroutine is running")
    6. defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
    7. defer c.Stop()
    8. for {
    9. // 创建拆包解包的对象...
    10. //读取客户端的Msg head...
    11. //拆包,得到msgid 和 datalen 放在msg中...
    12. //根据 dataLen 读取 data,放在msg.Data中...
    13. //得到当前客户端请求的Request数据
    14. req := Request{
    15. conn:c,
    16. msg:msg,
    17. }
    18. if utils.GlobalObject.WorkerPoolSize > 0 {
    19. //已经启动工作池机制,将消息交给Worker处理
    20. c.MsgHandler.SendMsgToTaskQueue(&req)
    21. } else {
    22. //从绑定好的消息和对应的处理方法中执行对应的Handle方法
    23. go c.MsgHandler.DoMsgHandler(&req)
    24. }
    25. }
    26. }

    这里并没有强制使用多任务Worker机制,而是判断用户配置WorkerPoolSize的个数,如果大于0,那么我就启动多任务机制处理链接请求消息,如果=0或者<0那么,我们依然只是之前的开启一个临时的Goroutine处理客户端请求消息。