• 8.2 创建及启动Worker工作池

    8.2 创建及启动Worker工作池

    现在添加Worker工作池,先定义一些启动工作池的接口

    zinx/ziface/imsghandler.go

    1. /*
    2. 消息管理抽象层
    3. */
    4. type IMsgHandle interface{
    5. DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息
    6. AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
    7. StartWorkerPool() //启动worker工作池
    8. SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理
    9. }

    zinx/znet/msghandler.go

    1. //启动一个Worker工作流程
    2. func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
    3. fmt.Println("Worker ID = ", workerID, " is started.")
    4. //不断的等待队列中的消息
    5. for {
    6. select {
    7. //有消息则取出队列的Request,并执行绑定的业务方法
    8. case request := <-taskQueue:
    9. mh.DoMsgHandler(request)
    10. }
    11. }
    12. }
    13. //启动worker工作池
    14. func (mh *MsgHandle) StartWorkerPool() {
    15. //遍历需要启动worker的数量,依此启动
    16. for i:= 0; i < int(mh.WorkerPoolSize); i++ {
    17. //一个worker被启动
    18. //给当前worker对应的任务队列开辟空间
    19. mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
    20. //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
    21. go mh.StartOneWorker(i, mh.TaskQueue[i])
    22. }
    23. }

    StartWorkerPool()方法是启动Worker工作池,这里根据用户配置好的WorkerPoolSize的数量来启动,然后分别给每个Worker分配一个TaskQueue,然后用一个goroutine来承载一个Worker的工作业务。

    StartOneWorker()方法就是一个Worker的工作业务,每个worker是不会退出的(目前没有设定worker的停止工作机制),会永久的从对应的TaskQueue中等待消息,并处理。