UnLua中实现基于RPC的网络数据收发机制
前言
在 UE 中使用 RPC 来负责与 DS 的通信,但是在使用 RPC 时它是通过封装成函数调用来使用的。如果发送一个复杂结构的数据就需要定义一个结构体才行,如果在蓝图或者 C++ 中使用倒还能接受都是强类型的。但是放到 Lua 中用的话就比较繁琐了,首先要在蓝图中定义 RPC 函数如果是复杂数据结构还要定义对应的结构体,然后再通过反射在 Lua 中创建结构体并调用 RPC 函数实现。所以我通过仿照游戏中常见的使用 protobuffer 来封装数据,通过一个固定的 rpc 函数来承载数据发送。这样既避免了每次要定义 RPC 函数和结构体的操作,又能在这过程中加入一些自定义的机制,比如队列、监听等机制。
使用方法
定义Proto
新建 proto 文件,定义需要发送消息的数据结构
需要在 proto 中使用固定 package 名
// cs_demo.proto syntax = "proto2"; package com.tencent.wea.protocol.xxx;定义消息结构可参考示例 proto
// cs_demo.proto message Demo_Req { optional int32 Id = 1; } message Demo_Rsp { optional int32 Id = 1; } message Demo_Ntf { optional int32 Id = 1; }
定义枚举
定义消息枚举并且把枚举和结构绑定
定义消息枚举,文件为:MessageEnum.lua
-- 全局访问方法 _G.MessageEnum.XXX -- 文件结构 ---@class MessageEnum local MessageEnum = { E_NONE = 0, -- 示例 E_DEMO_REQ = 1101, --请求测试 E_DEMO_RSP = 1102, --请求回复测试 E_DEMO_NTF = 1103, --主动推送测试 }关联消息结构,文件为::MessageName.lua
-- 全局访问方法 _G.MessageName[_G.MessageEnum.XXX] -- 文件结构 ---@type MessageEnum local MessageEnum = _G.MessageEnum ---@class MessageName local MessageName = { [MessageEnum.E_DEMO_REQ] = "Demo_Req", [MessageEnum.E_DEMO_RSP] = "Demo_Rsp", [MessageEnum.E_DEMO_NTF] = "Demo_Ntf", }定义消息错误码,路径为
MessageError.lua -- 全局访问方法 _G.MessageError.XX -- 文件结构 ---@class MessageError local MessageError = { E_SUCCESS = 0, --成功 }
Client
客户端收发消息等操作
发消息到 DS
---发送消息给DS ---参数1为消息ID,通过消息枚举获取 ---参数2为消息体 ---参数3为消息回复回调,可为空,如果传入回调函数则DS端必须回复。否则后续再发送需要回复的消息会一直排队等待 ---回包参数1为错误码,通过错误码枚举获取判断 ---回包参数2为消息体,如DS没有返回消息则为空 _G.Client.Network:SendMessage(_G.MessageEnum.E_DEMO_REQ, tMessage, function(nErrorCode, tRspMessage) Log("GM_DemoRsp", nErrorCode, tRspMessage.Id) end)注册监听消息
---注册监听事件 ---参数1为消息ID,通过消息枚举获取 ---参数2为注册的对象 ---参数3为回调函数 _G.Client.Network:RegisterMessage(_G.MessageEnum.E_DEMO_NTF, self, self.On_DemoNtf) ---On_DemoNtf ---测试接收消息 ---@param tMessage Demo_Rsp 消息体 function XXX:On_DemoNtf(tMessage) Log("GM_DemoNtf", tMessage.Id) end取消注册监听
---取消注册监听,取消该对象注册的所有监听 _G.Client.Network:UnRegisterWithScope(self) ---取消注册监听,取消特定对象的特定消息 _G.Client.Network:UnRegisterMessage(_G.MessageEnum.E_DEMO_NTF, self)
DS
DS 收发消息等操作
发送消息到 Client
---发送消息 ---参数1为玩家UID ---参数2为消息ID,通过消息枚举获取 ---参数3为消息体 ---参数4为是否用不可靠方式下发消息(默认为false) _G.DS.Network:SendMessage(10010, _G.MessageEnum.E_DEMO_NTF, tNtfMessage, bUnReliable)广播消息到 Client
---广播消息,广播给所有玩家 ---参数1为消息ID,通过消息枚举获取 ---参数2为消息体 ---参数3为是否用不可靠方式下发消息(默认为false) _G.DS.Network:BroadcastMessage(_G.MessageEnum.E_DEMO_NTF, tNtfMessage, bUnReliable)注册监听消息
---注册监听事件 (每个消息只能注册一次) ---参数1为消息ID,通过消息枚举获取 ---参数2为注册对象 ---参数3为回调函数 _G.DS.Network:RegisterMessage(_G.MessageEnum.E_DEMO_REQ, self, self.On_DemoReq) ---On_DemoReq ---测试接收请求消息 ---@param nPlayerUID number 玩家UID ---@param tMessage Demo_Req 消息结构 ---@param fReply function 回复方法 ---@param nRspUID number 回复UID,通常不需要使用,如果出现回复不在该函数处理时需要自行缓存 function XXX:On_DemoReq(nPlayerUID, tMessage, fReply, nRspUID) ---回复消息体 ---@type Demo_Rsp local tRspMessage = { Id = tMessage.Id + 100 } ---回复函数(方式一:带消息体) ---参数1为回复错误码,通过错误码枚举获取 ---参数1为回复消息ID,通过消息枚举获取 ---参数2为回复结构 fReply(MessageError.E_SUCCESS, _G.MessageEnum.E_DEMO_RSP, tRspMessage) ---回复函数(方式二:只带错误码) ---参数1为回复错误码,通过错误码枚举获取 fReply(_G.MessageError.E_SUCCESS) end回复消息到 Client
---回复消息 ---参数1为玩家UID ---参数2为回复UID,通过监听消息获取 ---参数3为回复错误码,通过错误码枚举获取 ---参数4为回复消息ID,通过消息枚举获取(如果为0或_SP.MessageEnum.E_NONE则不回复消息体) ---参数5为回复消息体 _G.DS.Network:ReplyMessage(10010, nRspUID, nErrorCode, _G.MessageEnum.E_DEMO_NTF, tNtfMessage)取消注册监听消息
---取消注册监听,取消该消息ID监听的事件 ---参数1为消息ID _G.DS.Network:UnRegisterMessage(nMessageId) ---取消注册监听,取消该对象所有消息监听 ---参数1为对象 _G.DS.Network:UnRegisterWithScope(self)
实现细节
RPC函数定义与调用
新建一个专门发 RPC 的组件,负责使用 RPC 机制收发消息。起名为“URPCCompoent”。挂在每个玩家的 PlayerController 中。
该组件通过 UnLua 的绑定脚本机制,绑定到名为“RPCComponent.lua”的脚本中,该脚本定义通用收发消息的的 RPC 函数。
---@class RPCComponent : URPCCompoent
local RPCComponent = UE4.Class()
-- DS收到消息
function RPCComponent:C2S_ReqMessageWithBytes_Lua(MessageID, RspUID, Message)
_G.DS.Network:ReceiveMessage(self.UID, MessageID, RspUID, Message)
end
-- Client收到DS回复消息
function RPCComponent:S2C_RspMessageWithBytes_Lua(MessageID, RspUID, ErrorCode, Message)
if not _G.IsDS then
_G.Client.Network:RspMessage(MessageID, RspUID, ErrorCode, Message)
end
end
-- Client收到DS广播消息
function RPCComponent:S2C_NtfMessageWithBytes_Lua(MessageID, Message)
if not _G.IsDS then
_G.Client.Network:NtfMessage(MessageID, Message)
end
end
-- Client收到DS广播不可靠消息
function RPCComponent:S2C_NtfMessageWithBytes_UnReliable_Lua(MessageID, Message)
if not _G.IsDS then
_G.Client.Network:NtfMessage(MessageID, Message)
end
end
return RPCComponent定义 Client 向 DS 发送消息的 RPC 函数
// 定义函数 UFUNCTION(Server, Reliable) void C2S_ReqMessage(int32 MessageId, int32 RspUID, const TArray<uint8>& Message); // 函数实现 void URPCCompoent::C2S_ReqMessage_Implementation(int32 MessageId, int32 RspUID, const TArray<uint8>& Message) { // 此处逻辑为客户端通过RPC发到DS上,DS接受到消息回调过来的函数。 // 由于是要和Lua进行交互,所以此处直接调用Lua函数 // 这里不通过反射机制实现是为了增加一些执行效率,如用FString做载体传入lua的话有几率丢失数据,\0问题。如果更好的解法请指教。 lua_State* L = UnLua::GetState(); // if (L) { const int32 Top = lua_gettop(L); lua_pushcfunction(L, UnLua::ReportLuaCallError); // 添加错误处理函数 UnLua::PushUObject(L, this); // 调用该类绑定的Lua中的函数 if (lua_getfield(L, -1, "C2S_ReqMessageWithBytes_Lua") == LUA_TFUNCTION) // 获取C2S_ReqMessageWithBytes_Lua函数 { size_t Size = Message.Num(); char* Bytes = static_cast<char*>(FMemory::Malloc(Size * sizeof(char))); ArrayToBytes(Message, Bytes); // 数组转换为字符串 UnLua::PushUObject(L, this); // 压栈参数 lua_pushinteger(L, MessageId); lua_pushinteger(L, RspUID); lua_pushlstring(L, Bytes, Size); lua_pcall(L, 4, 0, Top + 1); // 调用函数 FMemory::Free(Bytes); } lua_settop(L, Top); } }定义 DS 向 Client 发送消息的 RPC 函数
// 定义函数 UFUNCTION(Client, Reliable) void S2C_NtfMessage(int32 MessageId, const TArray<uint8>& Message); // 函数实现 void URPCCompoent::S2C_NtfMessage_Implementation(int32 MessageId, const TArray<uint8>& Message) { lua_State* L = UnLua::GetState(); if (L) { const int32 Top = lua_gettop(L); lua_pushcfunction(L, UnLua::ReportLuaCallError); // 添加错误处理函数 UnLua::PushUObject(L, this); // 调用该类绑定的Lua中的函数 if (lua_getfield(L, -1, "S2C_NtfMessageWithBytes_Lua") == LUA_TFUNCTION) // 获取S2C_NtfMessageWithBytes_Lua函数 { size_t Size = Message.Num(); char* Bytes = static_cast<char*>(FMemory::Malloc(Size * sizeof(char))); ArrayToBytes(Message, Bytes); // 数组转换为字符串 UnLua::PushUObject(L, this); // 压栈参数 lua_pushinteger(L, MessageId); lua_pushlstring(L, Bytes, Size); lua_pcall(L, 3, 0, Top + 1); // 调用函数 FMemory::Free(Bytes); } lua_settop(L, Top); } } // 定义函数 UFUNCTION(Client, Reliable) void S2C_RspMessage(int32 MessageId, int32 RspUID, int32 ErrorCode, const TArray<uint8>& Message); // 函数实现 - 仿照S2C_NtfMessage逻辑即可 新增了ErrorCode参数 // 定义函数 UFUNCTION(Client, Unreliable) void S2C_NtfMessage_UnReliable(int32 MessageId, const TArray<uint8>& Message); // 函数实现 - 仿照C2S_ReqMessage逻辑即可Client 接收 DS 广播消息
static int32 URPCCompoent_S2C_NtfMessageWithBytes(lua_State* L) { const int32 NumParams = lua_gettop(L); if (NumParams < 4) return luaL_error(L, "invalid parameters"); URPCCompoent* Self = Get(L, 1, UnLua::TType<URPCCompoent*>()); if (!IsValid(Self)) return luaL_error(L, "self is invalid"); const int32 MessageId = lua_tointeger(L, 2); const int bUnReliable = lua_toboolean(L, 3); TArray<uint8> Message; if (lua_isnil(L, 4) == false) { size_t Size = 0; const char* Bytes = lua_tolstring(L, 4, &Size); BytesToArray(Bytes, Size, Message); } if (bUnReliable != 0) Self->S2C_NtfMessage_UnReliable(MessageId, Message); else Self->S2C_NtfMessage(MessageId, Message); return 0; }Client 发送消息到 DS
static int32 URPCCompoent_C2S_ReqMessageWithBytes(lua_State* L) { const int32 NumParams = lua_gettop(L); if (NumParams < 3) return luaL_error(L, "invalid parameters"); URPCCompoent* Self = Get(L, 1, UnLua::TType<URPCCompoent*>()); if (!IsValid(Self)) return luaL_error(L, "self is invalid"); const int32 MessageId = lua_tointeger(L, 2); const int32 RspUID = lua_tointeger(L, 3); TArray<uint8> Message; if (lua_isnil(L, 4) == false) { size_t Size = 0; const char* Bytes = lua_tolstring(L, 4, &Size); BytesToArray(Bytes, Size, Message); } Self->C2S_ReqMessage(MessageId, RspUID, Message); return 0; }DS 回复 Client 消息
static int32 URPCCompoent_S2C_RspMessageWithBytes(lua_State* L) { const int32 NumParams = lua_gettop(L); if (NumParams < 4) return luaL_error(L, "invalid parameters"); URPCCompoent* Self = Get(L, 1, UnLua::TType<URPCCompoent*>()); if (!IsValid(Self)) return luaL_error(L, "self is invalid"); const int32 MessageId = lua_tointeger(L, 2); const int32 RspUID = lua_tointeger(L, 3); const int32 ErrorCode = lua_tointeger(L, 4); TArray<uint8> Message; if (lua_isnil(L, 5) == false) { size_t Size = 0; const char* Bytes = lua_tolstring(L, 5, &Size); BytesToArray(Bytes, Size, Message); } Self->S2C_RspMessage(MessageId, RspUID, ErrorCode, Message); return 0; }
Lua管理类封装
DS
基础函数
-- 获取对应玩家ID的RPCCompoent function GetRPCCompoent(nPlayerUID) -- 根据PlayerUID获取对应的Controller -- 再获取到对应的RPCCompoennt并返回 end -- 获取所有玩家的RPCCompoent function GetRPCComponentArray() -- 获取所有玩家的RPCCompoennt并返回 end -- DS初始化时调用 function DSNetworkManager:Init() -- 初始化必要容器 self.tMessageDict = {} self.tScopeDict = {} self.tWaitArray = {} self.tPbNameDict = {} self.lastUpdateTime = 0 end -- DS结束时调用 function DSNetworkManager:Shutdown() -- 清空必要容器 self.tMessageDict = nil self.tScopeDict = nil self.tWaitArray = nil self.tPbNameDict = nil end -- 检测回包超时并丢弃 function DSNetworkManager:Tick(dt) local now = math.floor(GetUnixTime()) if now - self.lastUpdateTime < 1 then return end self.lastUpdateTime = now for PlayerUID, WaitInfo in pairs(self.tWaitArray) do if now - WaitInfo.nTime > 3 then Error("CustomTick 回包超时", PlayerUID, WaitInfo.nRspUID, WaitInfo.nMessageId) self.tWaitArray[PlayerUID] = nil end end end -- PBName缓存 function DSNetworkManager:GetPBNameCache(nMessageId, MessageName) local pbName = self.tPbNameDict[nMessageId] if pbName then return pbName end pbName = PB_HEAD .. MessageName self.tPbNameDict[nMessageId] = pbName return pbName end网络消息注册与监听
---RegisterMessage ---注册消息 ---@public ---@param nMessageId number 消息ID ---@param tScope table 注册对象 ---@param fCallback function 回调函数 function DSNetworkManager:RegisterMessage(nMessageId, tScope, fCallback) if not nMessageId or not tScope or not fCallback then Error("RegisterMessage 参数错误", nMessageId, tScope ~= nil, fCallback ~= nil) return end local tHandler = self.tMessageDict[nMessageId] if tHandler then Error("重复注册 MessageId:", nMessageId) return end self.tMessageDict[nMessageId] = {tScope = tScope, fCallback = fCallback} local tScopeDict = self.tScopeDict[tScope] if not tScopeDict then tScopeDict = {} self.tScopeDict[tScope] = tScopeDict end tScopeDict[nMessageId] = true end ---UnRegisterMessage ---取消注册消息 ---@public ---@param nMessageId number 消息ID function DSNetworkManager:UnRegisterMessage(nMessageId) if not nMessageId then Error("UnRegisterMessage 参数错误", nMessageId) return end local tHandler = self.tMessageDict[nMessageId] if tHandler then local tScope = tHandler.tScope local tScopeDict = self.tScopeDict[tScope] if tScopeDict then tScopeDict[nMessageId] = nil end end self.tMessageDict[nMessageId] = nil end ---UnRegisterWithScope ---取消注册消息 ---@public ---@param tScope table 注册对象 function DSNetworkManager:UnRegisterWithScope(tScope) if not tScope then Error("UnRegisterWithScope tScope为空") return end local tScopeDict = self.tScopeDict[tScope] if tScopeDict then for nMessageId, _ in pairs(tScopeDict) do self.tMessageDict[nMessageId] = nil end end self.tScopeDict[tScope] = nil end消息发送与广播
---SendMessage ---发送消息 ---@public ---@param nPlayerUID number 玩家UID ---@param nMessageId number 消息ID ---@param tMessage table 消息结构 ---@param bUnReliable boolean 不可靠 function DSNetworkManager:SendMessage(nPlayerUID, nMessageId, tMessage, bUnReliable) if not nPlayerUID or not nMessageId or not tMessage then Error("SendMessage: 获取参数失败", nPlayerUID, nMessageId, tMessage) return end local MessageName = MESSAGE_NAME[nMessageId] if not MessageName then Error("SendMessage: 没有找到对应的MessageName MessageId:", nMessageId) return end if type(tMessage) ~= "table" then Error("SendMessage: tMessage不为table MessageId:", nMessageId) return end ---@type URPCComponent local RPCComponent = GetRPCCompoent(nPlayerUID) if not RPCComponent then Warning("SendMessage: 获取RPCComponent失败:", nPlayerUID) return end local pbName = self:GetPBNameCache(nMessageId, MessageName) local bytes = pb.encode(pbName, tMessage) RPCComponent:S2C_NtfMessageWithBytes(nMessageId, bUnReliable, bytes) end ---BroadcastMessage ---广播消息 ---@public ---@param nMessageId number 消息ID ---@param tMessage table 消息结构 ---@param bUnReliable boolean 不可靠 function DSNetworkManager:BroadcastMessage(nMessageId, tMessage, bUnReliable) local MessageName = MESSAGE_NAME[nMessageId] if not MessageName then Error("BroadcastMessage: 没有找到对应的MessageName MessageId:", nMessageId) return end if type(tMessage) ~= "table" then Error("BroadcastMessage: tMessage不为table MessageId:", nMessageId) return end local RPCComponentArray = GetRPCComponentArray() local pbName = self:GetPBNameCache(nMessageId, MessageName) local bytes = pb.encode(pbName, tMessage) for i = 1, RPCComponentArray:Length() do ---@type URPCComponent local RPCComponent = RPCComponentArray:Get(i) if RPCComponent then RPCComponent:S2C_NtfMessageWithBytes(nMessageId, bUnReliable, bytes) end end end消息回复
---ReplyMessage ---回复消息 ---@public ---@param nPlayerUID number 玩家UID ---@param nRspUID number 回复UID ---@param nErrorCode number 错误码 ---@param nMessageId number 消息ID ---@param tMessage table 消息结构 function DSNetworkManager:ReplyMessage(nPlayerUID, nRspUID, nErrorCode, nMessageId, tMessage) if not nPlayerUID or not nRspUID or not nErrorCode then Error("ReplyMessage: 获取参数失败", nPlayerUID, nRspUID, nErrorCode) return end ---@type URPCComponent local RPCComponent = GetRPCCompoent(nPlayerUID) if not RPCComponent then Warning("ReplyMessage: 获取RPCComponent失败:", nPlayerUID) return end if nErrorCode ~= _SP.MessageError.E_SUCCESS then Warning("ReplyMessage: 错误码错误:", nPlayerUID, nMessageId, nErrorCode) end local MessageName = "" if nMessageId then MessageName = MESSAGE_NAME[nMessageId] if not MessageName then Error("ReplyMessage: 没有找到对应的MessageName MessageId:", nMessageId) return end if type(tMessage) ~= "table" then Error("ReplyMessage: tMessage不为table MessageId:", nMessageId) return end local pbName = self:GetPBNameCache(nMessageId, MessageName) local bytes = pb.encode(pbName, tMessage) RPCComponent:S2C_RspMessageWithBytes(nMessageId, nRspUID, nErrorCode, bytes) else RPCComponent:S2C_RspMessageWithBytes(0, nRspUID, nErrorCode) end end消息接收
---ReceiveMessage ---接收消息 ---@private ---@param nPlayerUID number 玩家UID ---@param nRspUID number 回复UID ---@param nMessageId number 消息ID ---@param Message table 消息结构 function DSNetworkManager:ReceiveMessage(nPlayerUID, nMessageId, nRspUID, Message) if not nPlayerUID or not nMessageId or not nRspUID or not Message then Error("ReceiveMessage: 参数错误", nPlayerUID, nMessageId, nRspUID) return end local nStart = os.clock() local MessageName = MESSAGE_NAME[nMessageId] if not MessageName then Error("ReceiveMessage: 没有找到对应的MessageName MessageId:", nMessageId) return end if nRspUID > 0 then if nPlayerUID ~= 0 and self.tWaitArray[nPlayerUID] == nil then local nNow = math.floor(GetUnixTime()) self.tWaitArray[nPlayerUID] = { nRspUID = nRspUID, nMessageId = nMessageId, nTime = nNow, nClockTime = nStart, } else Warning("ReceiveMessage: 客户端重复发送请求或用户不存在", nPlayerUID, nMessageId) end end local fReply = function(nErrorCode, nReplyMessageId, tMessage) if nRspUID > 0 then self:ReplyMessage(nPlayerUID, nRspUID, nErrorCode, nReplyMessageId, tMessage) if nPlayerUID ~= 0 and self.tWaitArray[nPlayerUID] ~= nil then local Info = self.tWaitArray[nPlayerUID] self.tWaitArray[nPlayerUID] = nil else Warning("ReceiveMessage: 客户端回包队列请求不存在或用户不存在", nPlayerUID, nMessageId) end else Warning("ReceiveMessage: 客户端没有接收该回复", nMessageId) end end local pbName = self:GetPBNameCache(nMessageId, MessageName) local tMessage = pb.decode(pbName, Message) local tHandler = self.tMessageDict[nMessageId] if tHandler then local tScope = tHandler.tScope local fCallback = tHandler.fCallback if tScope and fCallback then xpcall(fCallback, _G.ErrorHandler, tScope, nPlayerUID, tMessage, fReply, nRspUID) end end end
Client
基础函数
-- 获取自己的RPCCompoent
function GetRPCCompoent()
-- 获取自己的Controller
-- 再获取到对应的RPCCompoennt并返回
end
-- 客户端初始化
function ClientNetworkManager:Init()
self.tMessageDict = {}
self.tScopeDict = {}
self.tPbNameDict = {}
self:ClearWaitArray(false)
end
-- 客户端结束
function ClientNetworkManager:Shutdown()
self.tMessageDict = {}
self.tScopeDict = {}
self.tPbNameDict = {}
end
-- 清理等待列表
function ClientNetworkManager:ClearWaitArray(timeout)
local nArrayLength = self.tWaitArray == nil and 0 or #self.tWaitArray
if timeout then
for i = 1, nArrayLength do
local tWait = self.tWaitArray[i]
if tWait and tWait.fResponse then
Warning("ClearWaitArray rpc call timeout", tWait.nMessageId, tWait.RspUID)
xpcall(tWait.fResponse, _G.ErrorHandler, _SP.MessageError.E_RPC_CALL_TIMEOUT)
end
end
end
self.nCount = 0
self.tWaitArray = {}
end
-- 获取消息自增ID
function ClientNetworkManager:GetMessageUID()
if self.nCount == math.maxinteger then
self.nCount = 0
end
self.nCount = self.nCount + 1
return self.nCount
end
-- 缓存PBName
function ClientNetworkManager:GetPBNameCache(nMessageId, MessageName)
local pbName = self.tPbNameDict[nMessageId]
if pbName then
return pbName
end
pbName = PB_HEAD .. MessageName
self.tPbNameDict[nMessageId] = pbName
return pbName
end网络消息注册与监听
---RegisterMessage ---注册消息 ---@public ---@param nMessageId number 消息ID ---@param tScope table 注册对象 ---@param fCallback function 回调函数 function ClientNetworkManager:RegisterMessage(nMessageId, tScope, fCallback) if not nMessageId or not tScope or not fCallback then Error("RegisterMessage 参数错误", nMessageId, tScope ~= nil, fCallback ~= nil) return end local tScopeDict = self.tScopeDict[tScope] if not tScopeDict then tScopeDict = {} self.tScopeDict[tScope] = tScopeDict end if tScopeDict[nMessageId] then Error("RegisterMessage, 同Scope重复注册 MessageId:", nMessageId) return end tScopeDict[nMessageId] = true local tHandlerDict = self.tMessageDict[nMessageId] if not tHandlerDict then tHandlerDict = {} self.tMessageDict[nMessageId] = tHandlerDict end tHandlerDict[tScope] = fCallback end ---UnRegisterMessage ---取消注册消息 ---@public ---@param nMessageId number 消息ID ---@param tScope table 注册对象 function ClientNetworkManager:UnRegisterMessage(nMessageId, tScope) if not nMessageId or not tScope then Error("UnRegisterMessage 参数错误", nMessageId, tScope ~= nil) return end local tScopeDict = self.tScopeDict[tScope] if tScopeDict then tScopeDict[nMessageId] = nil end local tHandlerDict = self.tMessageDict[nMessageId] if tHandlerDict then tHandlerDict[tScope] = nil end end ---UnRegisterWithScope ---取消注册消息 ---@public ---@param tScope table 环境对象 function ClientNetworkManager:UnRegisterWithScope(tScope) if not tScope then Error("UnRegisterWithScope tScope为空") return end local tScopeDict = self.tScopeDict[tScope] if tScopeDict then for nMessageId, _ in pairs(tScopeDict) do local tHandlerDict = self.tMessageDict[nMessageId] tHandlerDict[tScope] = nil end end self.tScopeDict[tScope] = nil end发送消息
---SendMessage ---发送消息 ---@public ---@param nMessageId number 消息ID ---@param tMessage table 消息结构 ---@param fResponse function 消息回复 function ClientNetworkManager:SendMessage(nMessageId, tMessage, fResponse) if not nMessageId or not tMessage then Error("SendMessage: 参数错误", nMessageId, tMessage ~= nil) return end local MessageName = MESSAGE_NAME[nMessageId] if not MessageName then Error("SendMessage: 没有找到对应的MessageName MessageId:", nMessageId) return end if type(tMessage) ~= "table" then Error("SendMessage: tMessage不为table MessageId:", nMessageId) return end ---@type URPCComponent local RPCComponent = GetRPCCompoent() if not RPCComponent then Warning("SendMessage: 没找到RPCComponent") return end local pbName = self:GetPBNameCache(nMessageId, MessageName) local bytes = pb.encode(pbName, tMessage) local BytesArrayNum = string.len(bytes) if fResponse then local RspUID = self:GetMessageUID() local now = math.floor(GetUnixTime()) Log("SendMessage", "Server Time:", now) local nWaitLength = #self.tWaitArray -- 如果出现超时情况,清空当前等待队列所有请求,只发送当前请求 if nWaitLength > 0 then local tWait = self.tWaitArray[1] if now - tWait.tTime > 10 then Warning("SendMessage Timeout", tWait.nMessageId, tWait.RspUID, nWaitLength, string.len(tWait.bytes), "byte") self:ClearWaitArray(true) nWaitLength = 0 end end table.insert(self.tWaitArray, { RspUID = RspUID, nMessageId = nMessageId, bytes = bytes, fResponse = fResponse, tTime = now, }) nWaitLength = nWaitLength + 1 if nWaitLength == 1 then RPCComponent:C2S_ReqMessageWithBytes(nMessageId, RspUID, bytes) Log("SendMessage", nMessageId, RspUID, MessageName, BytesArrayNum, "byte") else local nTopIndex = next(self.tWaitArray) local nWaitTop = 0 if nTopIndex then nWaitTop = self.tWaitArray[nTopIndex].nMessageId end Warning("SendMessage", nMessageId, RspUID, MessageName, BytesArrayNum, "byte", "WaitTop:", nWaitTop, "WaitLength:", nWaitLength) end else RPCComponent:C2S_ReqMessageWithBytes(nMessageId, 0, bytes) Log("SendMessage", nMessageId, 0, MessageName, BytesArrayNum, "byte") end end接收消息
---NtfMessage ---收到推送消息 ---@private ---@param nMessageId number ---@param Message table function ClientNetworkManager:NtfMessage(nMessageId, Message) if not nMessageId or not Message then Error("ReceiveMessage: 参数错误", nMessageId, Message ~= nil) return end local MessageName = MESSAGE_NAME[nMessageId] if not MessageName then Error("NtfMessage: 没有找到对应的MessageName MessageId:", nMessageId) return end if not self.tMessageDict then Error("NtfMessage: 没有初始化就收到了消息推送 MessageId:", nMessageId) return end local pbName = self:GetPBNameCache(nMessageId, MessageName) local tMessage = pb.decode(pbName, Message) local tHandlerDict = self.tMessageDict[nMessageId] if type(tHandlerDict) == "table" and next(tHandlerDict) then for tScope, fCallback in pairs(tHandlerDict) do if tScope and fCallback then xpcall(fCallback, _G.ErrorHandler, tScope, tMessage) end end end end收到回复消息
---RspMessage ---收到回复消息 ---@private ---@param nMessageId number ---@param nRspUID number ---@param nErrorCode number ---@param Message table function ClientNetworkManager:RspMessage(nMessageId, nRspUID, nErrorCode, Message) if not nRspUID or not Message then Error("RspMessage: 参数错误", nRspUID, Message ~= nil) return end local tWait = self.tWaitArray[1] if tWait and tWait.RspUID == nRspUID then if nMessageId ~= 0 then local MessageName = MESSAGE_NAME[nMessageId] if not MessageName then Error("RspMessage: 没有找到对应的MessageName MessageId:", nMessageId) return end local pbName = self:GetPBNameCache(nMessageId, MessageName) local tMessage = pb.decode(pbName, Message) xpcall(tWait.fResponse, _G.ErrorHandler, nErrorCode, tMessage) Log("RspMessage", nMessageId, nRspUID, MessageName, string.len(Message), "byte") else xpcall(tWait.fResponse, _G.ErrorHandler, nErrorCode) end local now = math.floor(GetUnixTime()) Log("RspMessage", "CurrentTime:", now, "MessageEnqueueTime:", tWait.tTime, "Diff:", now - tWait.tTime) table.remove(self.tWaitArray, 1) local tNext = self.tWaitArray[1] if tNext then ---@type URPCComponent local RPCComponent = GetRPCCompoent() if not RPCComponent then Warning("RspMessage: 没找到RPCComponent") return end local BytesArrayNum = string.len(tNext.bytes) local MessageName = MESSAGE_NAME[tNext.nMessageId] RPCComponent:C2S_ReqMessageWithBytes(tNext.nMessageId, tNext.RspUID, tNext.bytes) end else Warning("RspMessage: 收到回包不在消息队首中 已丢弃", nRspUID) end
评论