UnLua中实现基于RPC的网络数据收发机制

前言

在 UE 中使用 RPC 来负责与 DS 的通信,但是在使用 RPC 时它是通过封装成函数调用来使用的。如果发送一个复杂结构的数据就需要定义一个结构体才行,如果在蓝图或者 C++ 中使用倒还能接受都是强类型的。但是放到 Lua 中用的话就比较繁琐了,首先要在蓝图中定义 RPC 函数如果是复杂数据结构还要定义对应的结构体,然后再通过反射在 Lua 中创建结构体并调用 RPC 函数实现。所以我通过仿照游戏中常见的使用 protobuffer 来封装数据,通过一个固定的 rpc 函数来承载数据发送。这样既避免了每次要定义 RPC 函数和结构体的操作,又能在这过程中加入一些自定义的机制,比如队列、监听等机制。

使用方法

定义Proto

  1. 新建 proto 文件,定义需要发送消息的数据结构

  2. 需要在 proto 中使用固定 package 名

    // cs_demo.proto
    
    syntax = "proto2";
    package com.tencent.wea.protocol.xxx;
  3. 定义消息结构可参考示例 proto

    // cs_demo.proto
    
    message Demo_Req {
      optional int32 Id = 1;
    }
    
    message Demo_Rsp {
      optional int32 Id = 1;
    }
    
    message Demo_Ntf {
      optional int32 Id = 1;
    }

定义枚举

定义消息枚举并且把枚举和结构绑定

  1. 定义消息枚举,文件为:MessageEnum.lua

    -- 全局访问方法
    _G.MessageEnum.XXX
    
    -- 文件结构
    ---@class MessageEnum
    local MessageEnum = {
        E_NONE = 0,
        -- 示例
        E_DEMO_REQ = 1101, --请求测试
        E_DEMO_RSP = 1102, --请求回复测试
        E_DEMO_NTF = 1103, --主动推送测试
    }
  2. 关联消息结构,文件为::MessageName.lua

    -- 全局访问方法
    _G.MessageName[_G.MessageEnum.XXX]
    
    -- 文件结构
    ---@type MessageEnum
    local MessageEnum = _G.MessageEnum
    
    ---@class MessageName
    local MessageName = {
        [MessageEnum.E_DEMO_REQ] = "Demo_Req",
        [MessageEnum.E_DEMO_RSP] = "Demo_Rsp",
        [MessageEnum.E_DEMO_NTF] = "Demo_Ntf",
    }
  3. 定义消息错误码,路径为

    MessageError.lua
    
    -- 全局访问方法
    _G.MessageError.XX
    
    -- 文件结构
    ---@class MessageError
    local MessageError = {
        E_SUCCESS = 0, --成功
    }

Client

客户端收发消息等操作

  1. 发消息到 DS

    ---发送消息给DS
    ---参数1为消息ID,通过消息枚举获取
    ---参数2为消息体
    ---参数3为消息回复回调,可为空,如果传入回调函数则DS端必须回复。否则后续再发送需要回复的消息会一直排队等待
    ---回包参数1为错误码,通过错误码枚举获取判断
    ---回包参数2为消息体,如DS没有返回消息则为空
    _G.Client.Network:SendMessage(_G.MessageEnum.E_DEMO_REQ, tMessage, function(nErrorCode, tRspMessage)
        Log("GM_DemoRsp", nErrorCode, tRspMessage.Id)
      end)
  2. 注册监听消息

    ---注册监听事件
    ---参数1为消息ID,通过消息枚举获取
    ---参数2为注册的对象
    ---参数3为回调函数
    _G.Client.Network:RegisterMessage(_G.MessageEnum.E_DEMO_NTF, self, self.On_DemoNtf)
    
    ---On_DemoNtf
    ---测试接收消息
    ---@param tMessage Demo_Rsp 消息体
    function XXX:On_DemoNtf(tMessage)
        Log("GM_DemoNtf", tMessage.Id)
    end
    
  3. 取消注册监听

    ---取消注册监听,取消该对象注册的所有监听
    _G.Client.Network:UnRegisterWithScope(self)
    
    ---取消注册监听,取消特定对象的特定消息
    _G.Client.Network:UnRegisterMessage(_G.MessageEnum.E_DEMO_NTF, self)

DS

DS 收发消息等操作

  1. 发送消息到 Client

    ---发送消息
    ---参数1为玩家UID
    ---参数2为消息ID,通过消息枚举获取
    ---参数3为消息体
    ---参数4为是否用不可靠方式下发消息(默认为false)
    _G.DS.Network:SendMessage(10010, _G.MessageEnum.E_DEMO_NTF, tNtfMessage, bUnReliable)
  2. 广播消息到 Client

    ---广播消息,广播给所有玩家
    ---参数1为消息ID,通过消息枚举获取
    ---参数2为消息体
    ---参数3为是否用不可靠方式下发消息(默认为false)
    _G.DS.Network:BroadcastMessage(_G.MessageEnum.E_DEMO_NTF, tNtfMessage, bUnReliable)
  3. 注册监听消息

    ---注册监听事件 (每个消息只能注册一次)
    ---参数1为消息ID,通过消息枚举获取
    ---参数2为注册对象
    ---参数3为回调函数
    _G.DS.Network:RegisterMessage(_G.MessageEnum.E_DEMO_REQ, self, self.On_DemoReq)
    
    ---On_DemoReq
    ---测试接收请求消息
    ---@param nPlayerUID number 玩家UID
    ---@param tMessage Demo_Req 消息结构
    ---@param fReply function 回复方法
    ---@param nRspUID number 回复UID,通常不需要使用,如果出现回复不在该函数处理时需要自行缓存
    function XXX:On_DemoReq(nPlayerUID, tMessage, fReply, nRspUID)
      	---回复消息体
        ---@type Demo_Rsp
        local tRspMessage = {
          Id = tMessage.Id + 100
        }
      	
        ---回复函数(方式一:带消息体)
      	---参数1为回复错误码,通过错误码枚举获取
      	---参数1为回复消息ID,通过消息枚举获取
      	---参数2为回复结构
        fReply(MessageError.E_SUCCESS, _G.MessageEnum.E_DEMO_RSP, tRspMessage)
      
      	---回复函数(方式二:只带错误码)
      	---参数1为回复错误码,通过错误码枚举获取
        fReply(_G.MessageError.E_SUCCESS)
    end
  4. 回复消息到 Client

    ---回复消息
    ---参数1为玩家UID
    ---参数2为回复UID,通过监听消息获取
    ---参数3为回复错误码,通过错误码枚举获取
    ---参数4为回复消息ID,通过消息枚举获取(如果为0或_SP.MessageEnum.E_NONE则不回复消息体)
    ---参数5为回复消息体
    _G.DS.Network:ReplyMessage(10010, nRspUID, nErrorCode, _G.MessageEnum.E_DEMO_NTF, tNtfMessage)
  5. 取消注册监听消息

    ---取消注册监听,取消该消息ID监听的事件
    ---参数1为消息ID
    _G.DS.Network:UnRegisterMessage(nMessageId)
    
    ---取消注册监听,取消该对象所有消息监听
    ---参数1为对象
    _G.DS.Network:UnRegisterWithScope(self)

实现细节

RPC函数定义与调用

新建一个专门发 RPC 的组件,负责使用 RPC 机制收发消息。起名为“URPCCompoent”。挂在每个玩家的 PlayerController 中。

该组件通过 UnLua 的绑定脚本机制,绑定到名为“RPCComponent.lua”的脚本中,该脚本定义通用收发消息的的 RPC 函数。

---@class RPCComponent : URPCCompoent
local RPCComponent = UE4.Class()

-- DS收到消息
function RPCComponent:C2S_ReqMessageWithBytes_Lua(MessageID, RspUID, Message)
    _G.DS.Network:ReceiveMessage(self.UID, MessageID, RspUID, Message)
end

-- Client收到DS回复消息
function RPCComponent:S2C_RspMessageWithBytes_Lua(MessageID, RspUID, ErrorCode, Message)
    if not _G.IsDS then
        _G.Client.Network:RspMessage(MessageID, RspUID, ErrorCode, Message)
    end
end

-- Client收到DS广播消息
function RPCComponent:S2C_NtfMessageWithBytes_Lua(MessageID, Message)
    if not _G.IsDS then
        _G.Client.Network:NtfMessage(MessageID, Message)
    end
end

-- Client收到DS广播不可靠消息
function RPCComponent:S2C_NtfMessageWithBytes_UnReliable_Lua(MessageID, Message)
    if not _G.IsDS then
        _G.Client.Network:NtfMessage(MessageID, Message)
    end
end

return RPCComponent
  1. 定义 Client 向 DS 发送消息的 RPC 函数

    
    // 定义函数
    UFUNCTION(Server, Reliable)
    void C2S_ReqMessage(int32 MessageId, int32 RspUID, const TArray<uint8>& Message);
    // 函数实现
    void URPCCompoent::C2S_ReqMessage_Implementation(int32 MessageId, int32 RspUID, const TArray<uint8>& Message)
    {
      	// 此处逻辑为客户端通过RPC发到DS上,DS接受到消息回调过来的函数。
      	// 由于是要和Lua进行交互,所以此处直接调用Lua函数
      	// 这里不通过反射机制实现是为了增加一些执行效率,如用FString做载体传入lua的话有几率丢失数据,\0问题。如果更好的解法请指教。
    	lua_State* L = UnLua::GetState();	// 
    	if (L)
    	{
    		const int32 Top = lua_gettop(L);
    		lua_pushcfunction(L, UnLua::ReportLuaCallError);								// 添加错误处理函数
    		UnLua::PushUObject(L, this);													// 调用该类绑定的Lua中的函数
    		if (lua_getfield(L, -1, "C2S_ReqMessageWithBytes_Lua") == LUA_TFUNCTION)		// 获取C2S_ReqMessageWithBytes_Lua函数
    		{
    			size_t Size = Message.Num();
    			char* Bytes = static_cast<char*>(FMemory::Malloc(Size * sizeof(char)));
    			ArrayToBytes(Message, Bytes);												// 数组转换为字符串
    			
    			UnLua::PushUObject(L, this);												// 压栈参数
    			lua_pushinteger(L, MessageId);
    			lua_pushinteger(L, RspUID);
    			lua_pushlstring(L, Bytes, Size);
    			lua_pcall(L, 4, 0, Top + 1);												// 调用函数
    
    			FMemory::Free(Bytes);
    		}
    		lua_settop(L, Top);
    	}
    }
  2. 定义 DS 向 Client 发送消息的 RPC 函数

    // 定义函数
    UFUNCTION(Client, Reliable)
    void S2C_NtfMessage(int32 MessageId, const TArray<uint8>& Message);
    // 函数实现
    void URPCCompoent::S2C_NtfMessage_Implementation(int32 MessageId, const TArray<uint8>& Message)
    {
    	lua_State* L = UnLua::GetState();
    	if (L)
    	{
    		const int32 Top = lua_gettop(L);
    		lua_pushcfunction(L, UnLua::ReportLuaCallError);								// 添加错误处理函数
    		UnLua::PushUObject(L, this);													// 调用该类绑定的Lua中的函数
    		if (lua_getfield(L, -1, "S2C_NtfMessageWithBytes_Lua") == LUA_TFUNCTION)		// 获取S2C_NtfMessageWithBytes_Lua函数
    		{
    			size_t Size = Message.Num();
    			char* Bytes = static_cast<char*>(FMemory::Malloc(Size * sizeof(char)));
    			ArrayToBytes(Message, Bytes);												// 数组转换为字符串
    			
    			UnLua::PushUObject(L, this);												// 压栈参数
    			lua_pushinteger(L, MessageId);
    			lua_pushlstring(L, Bytes, Size);
    			lua_pcall(L, 3, 0, Top + 1);												// 调用函数
    			FMemory::Free(Bytes);
    		}
    		lua_settop(L, Top);
    	}
    }
    
    // 定义函数
    UFUNCTION(Client, Reliable)
    void S2C_RspMessage(int32 MessageId, int32 RspUID, int32 ErrorCode, const TArray<uint8>& Message);
    // 函数实现 - 仿照S2C_NtfMessage逻辑即可 新增了ErrorCode参数
    
    // 定义函数
    UFUNCTION(Client, Unreliable)
    void S2C_NtfMessage_UnReliable(int32 MessageId, const TArray<uint8>& Message);
    // 函数实现 - 仿照C2S_ReqMessage逻辑即可
  3. Client 接收 DS 广播消息

    static int32 URPCCompoent_S2C_NtfMessageWithBytes(lua_State* L)
    {
    	const int32 NumParams = lua_gettop(L);
    	if (NumParams < 4)
    		return luaL_error(L, "invalid parameters");
    
    	URPCCompoent* Self = Get(L, 1, UnLua::TType<URPCCompoent*>());
    	if (!IsValid(Self))
    		return luaL_error(L, "self is invalid");
    
    	const int32 MessageId = lua_tointeger(L, 2);
    	const int bUnReliable = lua_toboolean(L, 3);
    
    	TArray<uint8> Message;
    	if (lua_isnil(L, 4) == false)
    	{
    		size_t Size = 0;
    		const char* Bytes = lua_tolstring(L, 4, &Size);
    		BytesToArray(Bytes, Size, Message);
    	}
    
    	if (bUnReliable != 0)
    		Self->S2C_NtfMessage_UnReliable(MessageId, Message);
    	else
    		Self->S2C_NtfMessage(MessageId, Message);
    	
    	return 0;
    }
  4. Client 发送消息到 DS

    static int32 URPCCompoent_C2S_ReqMessageWithBytes(lua_State* L)
    {
    	const int32 NumParams = lua_gettop(L);
    	if (NumParams < 3)
    		return luaL_error(L, "invalid parameters");
    
    	URPCCompoent* Self = Get(L, 1, UnLua::TType<URPCCompoent*>());
    	if (!IsValid(Self))
    		return luaL_error(L, "self is invalid");
    
    	const int32 MessageId = lua_tointeger(L, 2);
    	const int32 RspUID = lua_tointeger(L, 3);
    
    	TArray<uint8> Message;
    	if (lua_isnil(L, 4) == false)
    	{
    		size_t Size = 0;
    		const char* Bytes = lua_tolstring(L, 4, &Size);
    		BytesToArray(Bytes, Size, Message);
    	}
    
    	Self->C2S_ReqMessage(MessageId, RspUID, Message);
    	
    	return 0;
    }
  5. DS 回复 Client 消息

    static int32 URPCCompoent_S2C_RspMessageWithBytes(lua_State* L)
    {
    	const int32 NumParams = lua_gettop(L);
    	if (NumParams < 4)
    		return luaL_error(L, "invalid parameters");
    
    	URPCCompoent* Self = Get(L, 1, UnLua::TType<URPCCompoent*>());
    	if (!IsValid(Self))
    		return luaL_error(L, "self is invalid");
    
    	const int32 MessageId = lua_tointeger(L, 2);
    	const int32 RspUID = lua_tointeger(L, 3);
    	const int32 ErrorCode = lua_tointeger(L, 4);
    
    	TArray<uint8> Message;
    	if (lua_isnil(L, 5) == false)
    	{
    		size_t Size = 0;
    		const char* Bytes = lua_tolstring(L, 5, &Size);
    		BytesToArray(Bytes, Size, Message);
    	}
    
    	Self->S2C_RspMessage(MessageId, RspUID, ErrorCode, Message);
    	
    	return 0;
    }

Lua管理类封装

DS
  1. 基础函数

    -- 获取对应玩家ID的RPCCompoent
    function GetRPCCompoent(nPlayerUID)
      -- 根据PlayerUID获取对应的Controller
      -- 再获取到对应的RPCCompoennt并返回
    end
    
    -- 获取所有玩家的RPCCompoent
    function GetRPCComponentArray()
      -- 获取所有玩家的RPCCompoennt并返回
    end
    
    -- DS初始化时调用
    function DSNetworkManager:Init()
      	-- 初始化必要容器
      	self.tMessageDict = {}
        self.tScopeDict = {}
        self.tWaitArray = {}
      	self.tPbNameDict = {}
      	self.lastUpdateTime = 0
    end
    
    -- DS结束时调用
    function DSNetworkManager:Shutdown()
      	-- 清空必要容器
      	self.tMessageDict = nil
        self.tScopeDict = nil
      	self.tWaitArray = nil
      	self.tPbNameDict = nil
    end
    
    -- 检测回包超时并丢弃
    function DSNetworkManager:Tick(dt)
        local now = math.floor(GetUnixTime())
        if now - self.lastUpdateTime < 1 then
            return
        end
        self.lastUpdateTime = now
    
        for PlayerUID, WaitInfo in pairs(self.tWaitArray) do
            if now - WaitInfo.nTime > 3 then
                Error("CustomTick 回包超时", PlayerUID, WaitInfo.nRspUID, WaitInfo.nMessageId)
                self.tWaitArray[PlayerUID] = nil
            end
        end
    end
    
    -- PBName缓存
    function DSNetworkManager:GetPBNameCache(nMessageId, MessageName)
        local pbName = self.tPbNameDict[nMessageId]
        if pbName then
            return pbName
        end
    
        pbName = PB_HEAD .. MessageName
        self.tPbNameDict[nMessageId] = pbName
        return pbName
    end
  2. 网络消息注册与监听

    ---RegisterMessage
    ---注册消息
    ---@public
    ---@param nMessageId number 消息ID
    ---@param tScope table 注册对象
    ---@param fCallback function 回调函数
    function DSNetworkManager:RegisterMessage(nMessageId, tScope, fCallback)
        if not nMessageId or not tScope or not fCallback then
            Error("RegisterMessage 参数错误", nMessageId, tScope ~= nil, fCallback ~= nil)
            return
        end
    
        local tHandler = self.tMessageDict[nMessageId]
        if tHandler then
            Error("重复注册 MessageId:", nMessageId)
            return
        end
        self.tMessageDict[nMessageId] = {tScope = tScope, fCallback = fCallback}
    
        local tScopeDict = self.tScopeDict[tScope]
        if not tScopeDict then
            tScopeDict = {}
            self.tScopeDict[tScope] = tScopeDict
        end
        tScopeDict[nMessageId] = true
    end
    
    ---UnRegisterMessage
    ---取消注册消息
    ---@public
    ---@param nMessageId number 消息ID
    function DSNetworkManager:UnRegisterMessage(nMessageId)
        if not nMessageId then
            Error("UnRegisterMessage 参数错误", nMessageId)
            return
        end
    
        local tHandler = self.tMessageDict[nMessageId]
        if tHandler then
            local tScope = tHandler.tScope
            local tScopeDict = self.tScopeDict[tScope]
            if tScopeDict then
                tScopeDict[nMessageId] = nil
            end
        end
    
        self.tMessageDict[nMessageId] = nil
    end
    
    ---UnRegisterWithScope
    ---取消注册消息
    ---@public
    ---@param tScope table 注册对象
    function DSNetworkManager:UnRegisterWithScope(tScope)
        if not tScope then
            Error("UnRegisterWithScope tScope为空")
            return
        end
    
        local tScopeDict = self.tScopeDict[tScope]
        if tScopeDict then
            for nMessageId, _ in pairs(tScopeDict) do
                self.tMessageDict[nMessageId] = nil
            end
        end
        self.tScopeDict[tScope] = nil
    end
  3. 消息发送与广播

    ---SendMessage
    ---发送消息
    ---@public
    ---@param nPlayerUID number 玩家UID
    ---@param nMessageId number 消息ID
    ---@param tMessage table 消息结构
    ---@param bUnReliable boolean 不可靠
    function DSNetworkManager:SendMessage(nPlayerUID, nMessageId, tMessage, bUnReliable)
        if not nPlayerUID or not nMessageId or not tMessage then
            Error("SendMessage: 获取参数失败", nPlayerUID, nMessageId, tMessage)
            return
        end
        local MessageName = MESSAGE_NAME[nMessageId]
        if not MessageName then
            Error("SendMessage: 没有找到对应的MessageName MessageId:", nMessageId)
            return
        end
        if type(tMessage) ~= "table" then
            Error("SendMessage: tMessage不为table MessageId:", nMessageId)
            return
        end
        ---@type URPCComponent
        local RPCComponent = GetRPCCompoent(nPlayerUID)
        if not RPCComponent then
            Warning("SendMessage: 获取RPCComponent失败:", nPlayerUID)
            return
        end
    
        local pbName = self:GetPBNameCache(nMessageId, MessageName)
        local bytes = pb.encode(pbName, tMessage)
        RPCComponent:S2C_NtfMessageWithBytes(nMessageId, bUnReliable, bytes)
    end
    
    ---BroadcastMessage
    ---广播消息
    ---@public
    ---@param nMessageId number 消息ID
    ---@param tMessage table 消息结构
    ---@param bUnReliable boolean 不可靠
    function DSNetworkManager:BroadcastMessage(nMessageId, tMessage, bUnReliable)
      	local MessageName = MESSAGE_NAME[nMessageId]
        if not MessageName then
          Error("BroadcastMessage: 没有找到对应的MessageName MessageId:", nMessageId)
          return
        end
        if type(tMessage) ~= "table" then
          Error("BroadcastMessage: tMessage不为table MessageId:", nMessageId)
          return
        end
      	local RPCComponentArray = GetRPCComponentArray()
        local pbName = self:GetPBNameCache(nMessageId, MessageName)
        local bytes = pb.encode(pbName, tMessage)
        for i = 1, RPCComponentArray:Length() do
          ---@type URPCComponent
            local RPCComponent = RPCComponentArray:Get(i)
            if RPCComponent then
              RPCComponent:S2C_NtfMessageWithBytes(nMessageId, bUnReliable, bytes)
            end
        end
    end
  4. 消息回复

    ---ReplyMessage
    ---回复消息
    ---@public
    ---@param nPlayerUID number 玩家UID
    ---@param nRspUID number 回复UID
    ---@param nErrorCode number 错误码
    ---@param nMessageId number 消息ID
    ---@param tMessage table 消息结构
    function DSNetworkManager:ReplyMessage(nPlayerUID, nRspUID, nErrorCode, nMessageId, tMessage)
        if not nPlayerUID or not nRspUID or not nErrorCode then
            Error("ReplyMessage: 获取参数失败", nPlayerUID, nRspUID, nErrorCode)
            return
        end
        ---@type URPCComponent
        local RPCComponent = GetRPCCompoent(nPlayerUID)
        if not RPCComponent then
            Warning("ReplyMessage: 获取RPCComponent失败:", nPlayerUID)
            return
        end
        if nErrorCode ~= _SP.MessageError.E_SUCCESS then
            Warning("ReplyMessage: 错误码错误:", nPlayerUID, nMessageId, nErrorCode)
        end
    
        local MessageName = ""
        if nMessageId then
            MessageName = MESSAGE_NAME[nMessageId]
            if not MessageName then
                Error("ReplyMessage: 没有找到对应的MessageName MessageId:", nMessageId)
                return
            end
            if type(tMessage) ~= "table" then
                Error("ReplyMessage: tMessage不为table MessageId:", nMessageId)
                return
            end
            local pbName = self:GetPBNameCache(nMessageId, MessageName)
            local bytes = pb.encode(pbName, tMessage)
            RPCComponent:S2C_RspMessageWithBytes(nMessageId, nRspUID, nErrorCode, bytes)
        else
            RPCComponent:S2C_RspMessageWithBytes(0, nRspUID, nErrorCode)
        end
    end
  5. 消息接收

    ---ReceiveMessage
    ---接收消息
    ---@private
    ---@param nPlayerUID number 玩家UID
    ---@param nRspUID number 回复UID
    ---@param nMessageId number 消息ID
    ---@param Message table 消息结构
    function DSNetworkManager:ReceiveMessage(nPlayerUID, nMessageId, nRspUID, Message)
        if not nPlayerUID or not nMessageId or not nRspUID or not Message then
            Error("ReceiveMessage: 参数错误", nPlayerUID, nMessageId, nRspUID)
            return
        end
    
        local nStart = os.clock()
        local MessageName = MESSAGE_NAME[nMessageId]
        if not MessageName then
            Error("ReceiveMessage: 没有找到对应的MessageName MessageId:", nMessageId)
            return
        end
    
        if nRspUID > 0 then
            if nPlayerUID ~= 0 and self.tWaitArray[nPlayerUID] == nil then
                local nNow = math.floor(GetUnixTime())
                self.tWaitArray[nPlayerUID] = {
                    nRspUID = nRspUID,
                    nMessageId = nMessageId,
                    nTime = nNow,
                    nClockTime = nStart,
                }
            else
                Warning("ReceiveMessage: 客户端重复发送请求或用户不存在", nPlayerUID, nMessageId)
            end
        end
    
        local fReply = function(nErrorCode, nReplyMessageId, tMessage)
            if nRspUID > 0 then
                self:ReplyMessage(nPlayerUID, nRspUID, nErrorCode, nReplyMessageId, tMessage)
    
                if nPlayerUID ~= 0 and self.tWaitArray[nPlayerUID] ~= nil then
                    local Info = self.tWaitArray[nPlayerUID]
                    self.tWaitArray[nPlayerUID] = nil
                else
                    Warning("ReceiveMessage: 客户端回包队列请求不存在或用户不存在", nPlayerUID, nMessageId)
                end
            else
                Warning("ReceiveMessage: 客户端没有接收该回复", nMessageId)
            end
        end
    
        local pbName = self:GetPBNameCache(nMessageId, MessageName)
        local tMessage = pb.decode(pbName, Message)
        local tHandler = self.tMessageDict[nMessageId]
        if tHandler then
            local tScope = tHandler.tScope
            local fCallback = tHandler.fCallback
            if tScope and fCallback then
                xpcall(fCallback, _G.ErrorHandler, tScope, nPlayerUID, tMessage, fReply, nRspUID)
            end
        end
    end
Client

基础函数

-- 获取自己的RPCCompoent
function GetRPCCompoent()
  -- 获取自己的Controller
  -- 再获取到对应的RPCCompoennt并返回
end

-- 客户端初始化
function ClientNetworkManager:Init()
    self.tMessageDict = {}
    self.tScopeDict = {}
    self.tPbNameDict = {}

    self:ClearWaitArray(false)
end

-- 客户端结束
function ClientNetworkManager:Shutdown()
    self.tMessageDict = {}
    self.tScopeDict = {}
    self.tPbNameDict = {}
end

-- 清理等待列表
function ClientNetworkManager:ClearWaitArray(timeout)
    local nArrayLength = self.tWaitArray == nil and 0 or #self.tWaitArray
    if timeout then
        for i = 1, nArrayLength do
            local tWait = self.tWaitArray[i]
            if tWait and tWait.fResponse then
                Warning("ClearWaitArray rpc call timeout", tWait.nMessageId, tWait.RspUID)
                xpcall(tWait.fResponse, _G.ErrorHandler, _SP.MessageError.E_RPC_CALL_TIMEOUT)
            end
        end
    end
    self.nCount = 0
    self.tWaitArray = {}
end

-- 获取消息自增ID
function ClientNetworkManager:GetMessageUID()
    if self.nCount == math.maxinteger then
        self.nCount = 0
    end
    self.nCount = self.nCount + 1
    return self.nCount
end

-- 缓存PBName
function ClientNetworkManager:GetPBNameCache(nMessageId, MessageName)
    local pbName = self.tPbNameDict[nMessageId]
    if pbName then
        return pbName
    end

    pbName = PB_HEAD .. MessageName
    self.tPbNameDict[nMessageId] = pbName
    return pbName
end
  1. 网络消息注册与监听

    ---RegisterMessage
    ---注册消息
    ---@public
    ---@param nMessageId number 消息ID
    ---@param tScope table 注册对象
    ---@param fCallback function 回调函数
    function ClientNetworkManager:RegisterMessage(nMessageId, tScope, fCallback)
        if not nMessageId or not tScope or not fCallback then
            Error("RegisterMessage 参数错误", nMessageId, tScope ~= nil, fCallback ~= nil)
            return
        end
    
        local tScopeDict = self.tScopeDict[tScope]
        if not tScopeDict then
            tScopeDict = {}
            self.tScopeDict[tScope] = tScopeDict
        end
        if tScopeDict[nMessageId] then
            Error("RegisterMessage, 同Scope重复注册 MessageId:", nMessageId)
            return
        end
        tScopeDict[nMessageId] = true
    
        local tHandlerDict = self.tMessageDict[nMessageId]
        if not tHandlerDict then
            tHandlerDict = {}
            self.tMessageDict[nMessageId] = tHandlerDict
        end
        tHandlerDict[tScope] = fCallback
    end
    
    ---UnRegisterMessage
    ---取消注册消息
    ---@public
    ---@param nMessageId number 消息ID
    ---@param tScope table 注册对象
    function ClientNetworkManager:UnRegisterMessage(nMessageId, tScope)
        if not nMessageId or not tScope then
            Error("UnRegisterMessage 参数错误", nMessageId, tScope ~= nil)
            return
        end
    
        local tScopeDict = self.tScopeDict[tScope]
        if tScopeDict then
            tScopeDict[nMessageId] = nil
        end
    
        local tHandlerDict = self.tMessageDict[nMessageId]
        if tHandlerDict then
            tHandlerDict[tScope] = nil
        end
    end
    
    ---UnRegisterWithScope
    ---取消注册消息
    ---@public
    ---@param tScope table 环境对象
    function ClientNetworkManager:UnRegisterWithScope(tScope)
        if not tScope then
            Error("UnRegisterWithScope tScope为空")
            return
        end
    
        local tScopeDict = self.tScopeDict[tScope]
        if tScopeDict then
            for nMessageId, _ in pairs(tScopeDict) do
                local tHandlerDict = self.tMessageDict[nMessageId]
                tHandlerDict[tScope] = nil
            end
        end
        self.tScopeDict[tScope] = nil
    end
  2. 发送消息

    ---SendMessage
    ---发送消息
    ---@public
    ---@param nMessageId number 消息ID
    ---@param tMessage table 消息结构
    ---@param fResponse function 消息回复
    function ClientNetworkManager:SendMessage(nMessageId, tMessage, fResponse)
        if not nMessageId or not tMessage then
            Error("SendMessage: 参数错误", nMessageId, tMessage ~= nil)
            return
        end
    
        local MessageName = MESSAGE_NAME[nMessageId]
        if not MessageName then
            Error("SendMessage: 没有找到对应的MessageName MessageId:", nMessageId)
            return
        end
    
        if type(tMessage) ~= "table" then
            Error("SendMessage: tMessage不为table MessageId:", nMessageId)
            return
        end
    
        ---@type URPCComponent
        local RPCComponent = GetRPCCompoent()
        if not RPCComponent then
            Warning("SendMessage: 没找到RPCComponent")
            return
        end
    
        local pbName = self:GetPBNameCache(nMessageId, MessageName)
        local bytes = pb.encode(pbName, tMessage)
        local BytesArrayNum = string.len(bytes)
        if fResponse then
            local RspUID = self:GetMessageUID()
            local now = math.floor(GetUnixTime())
            Log("SendMessage", "Server Time:", now)
            local nWaitLength = #self.tWaitArray
            -- 如果出现超时情况,清空当前等待队列所有请求,只发送当前请求
            if nWaitLength > 0 then
                local tWait = self.tWaitArray[1]
                if now - tWait.tTime > 10 then
                    Warning("SendMessage Timeout", tWait.nMessageId, tWait.RspUID, nWaitLength, string.len(tWait.bytes), "byte")
                    self:ClearWaitArray(true)
                    nWaitLength = 0
                end
            end
            table.insert(self.tWaitArray, {
                RspUID = RspUID,
                nMessageId = nMessageId,
                bytes = bytes,
                fResponse = fResponse,
                tTime = now,
            })
            nWaitLength = nWaitLength + 1
            if nWaitLength == 1 then
                RPCComponent:C2S_ReqMessageWithBytes(nMessageId, RspUID, bytes)
                Log("SendMessage", nMessageId, RspUID, MessageName, BytesArrayNum, "byte")
            else
                local nTopIndex = next(self.tWaitArray)
                local nWaitTop = 0
                if nTopIndex then
                    nWaitTop = self.tWaitArray[nTopIndex].nMessageId
                end
                Warning("SendMessage", nMessageId, RspUID, MessageName, BytesArrayNum, "byte", "WaitTop:", nWaitTop, "WaitLength:", nWaitLength)
            end
        else
            RPCComponent:C2S_ReqMessageWithBytes(nMessageId, 0, bytes)
            Log("SendMessage", nMessageId, 0, MessageName, BytesArrayNum, "byte")
        end
    end
  3. 接收消息

    ---NtfMessage
    ---收到推送消息
    ---@private
    ---@param nMessageId number
    ---@param Message table
    function ClientNetworkManager:NtfMessage(nMessageId, Message)
        if not nMessageId or not Message then
            Error("ReceiveMessage: 参数错误", nMessageId, Message ~= nil)
            return
        end
    
        local MessageName = MESSAGE_NAME[nMessageId]
        if not MessageName then
            Error("NtfMessage: 没有找到对应的MessageName MessageId:", nMessageId)
            return
        end
    
        if not self.tMessageDict then
            Error("NtfMessage: 没有初始化就收到了消息推送 MessageId:", nMessageId)
            return
        end
    
        local pbName = self:GetPBNameCache(nMessageId, MessageName)
        local tMessage = pb.decode(pbName, Message)
        local tHandlerDict = self.tMessageDict[nMessageId]
        if type(tHandlerDict) == "table" and next(tHandlerDict) then
            for tScope, fCallback in pairs(tHandlerDict) do
                if tScope and fCallback then
                    xpcall(fCallback, _G.ErrorHandler, tScope, tMessage)
                end
            end
        end
    end
  4. 收到回复消息

    ---RspMessage
    ---收到回复消息
    ---@private
    ---@param nMessageId number
    ---@param nRspUID number
    ---@param nErrorCode number
    ---@param Message table
    function ClientNetworkManager:RspMessage(nMessageId, nRspUID, nErrorCode, Message)
        if not nRspUID or not Message then
            Error("RspMessage: 参数错误", nRspUID, Message ~= nil)
            return
        end
    
        local tWait = self.tWaitArray[1]
        if tWait and tWait.RspUID == nRspUID then
            if nMessageId ~= 0 then
                local MessageName = MESSAGE_NAME[nMessageId]
                if not MessageName then
                    Error("RspMessage: 没有找到对应的MessageName MessageId:", nMessageId)
                    return
                end
                local pbName = self:GetPBNameCache(nMessageId, MessageName)
                local tMessage = pb.decode(pbName, Message)
                xpcall(tWait.fResponse, _G.ErrorHandler, nErrorCode, tMessage)
    
                Log("RspMessage", nMessageId, nRspUID, MessageName, string.len(Message), "byte")
            else
                xpcall(tWait.fResponse, _G.ErrorHandler, nErrorCode)
            end
            local now = math.floor(GetUnixTime())
            Log("RspMessage", "CurrentTime:", now, "MessageEnqueueTime:", tWait.tTime, "Diff:", now - tWait.tTime)
            table.remove(self.tWaitArray, 1)
    
            local tNext = self.tWaitArray[1]
            if tNext then
                ---@type URPCComponent
        		local RPCComponent = GetRPCCompoent()
                if not RPCComponent then
                    Warning("RspMessage: 没找到RPCComponent")
                    return
                end
    
                local BytesArrayNum = string.len(tNext.bytes)
                local MessageName = MESSAGE_NAME[tNext.nMessageId]
                RPCComponent:C2S_ReqMessageWithBytes(tNext.nMessageId, tNext.RspUID, tNext.bytes)
            end
        else
            Warning("RspMessage: 收到回包不在消息队首中 已丢弃", nRspUID)
        end
    

评论