博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【转载】ESFramework 平台下可复用的Tcp通信层实现(续)
阅读量:6345 次
发布时间:2019-06-22

本文共 9150 字,大约阅读时间需要 30 分钟。

     上一篇主要讲到了Tcp通信层中的核心组件――Tcp组件的实现,Tcp组件是整个通信层的消息驱动源,甚至,可以将Tcp组件看作是我们整个服务器系统的消息驱动源,消息处理过程从这里引发。类似的消息驱动源还有发布的WebService接口、Remoting接口等。今天我们需要关注的是Tcp通信层中的“中央”组件――消息分派器组件ITcpReqStreamDispatcher,大家已经从前文的组件关系图中看到了消息分派器的大致位置和作用了,它是Tcp通信组件和消息处理器之间的“桥梁”。我们再对前文描述的通信层组件之间关系的一段话回顾一下:
    “当网络(Tcp)组件从某个Tcp连接上接收到一个请求时,会将请求转发给消息分派器,消息分派器通过IDataStreamHelper组件获取请求消息的类型,然后根据此类型要求处理器工厂创建对应类型的请求处理器,请求处理器处理请求并返回结果。接下来再由网络组件把结果返回给终端用户。在消息分派器进行请求消息分派之前,可能涉及一系列的操作,像消息加密/解密、消息分裂/重组、消息验证等。”
    上面的描述中已经体现出了消息分派器的主要职责,在理解了消息分派器职责的基础上,我们可以进一步来看看消息分派器的定义和实现了。 

二.消息分派器组件

1.消息分派器组件接口的定义

    消息分派器的接口很简单:

    public
 
interface
 ITcpReqStreamDispatcher : IReqestStreamDispatcher 
    {        
        ArrayList DealRequestMessage(RequestData requestData ,
out
 
byte
[] leftData ,
ref
 RequestValidation validation) ;
//
同步回复
        
bool
      DealRequestMessage(RequestData requestData , NetworkStream userStream ,
out
 
byte
[] leftData) ; 
//
异步回复        
    }
    这个接口只有两个方法,第二个方法用于异步发送回复(即绕开
Tcp 组件发送回复),该方法的核心部分可以由第一个方法实现,我们把注意力放在第一个方法上,而
Tcp 组件与消息分派器进行交互的也正是第一个方法。我先解释一下这个方法的几个参数的含义:
    RequestData是对请求消息的封装:
    //
从网络接收到的原始数据的封装
    
public
 
class
 RequestData
    {
        
public
 
int
  ConnectID 
=
 
0
 ;
        
public
 
bool
 IsFirstMsg 
=
 
false
 ; 
//
标志是否为连接建立后的第一条消息
        
public
 
byte
[] Buff     
=
 
null
 ; 
//
接收数据缓冲区 ,可能其头部包含上次未处理完的数据
        
public
 
int
 ValidCount  
=
 
0
 ; 
//
缓冲区中有效字节的个数 >= 本次接收的字节数        
    }
    前面已经提到过,
ConnectID 用于标志每一个
Tcp 连接,
IsFirstMsg 用于表明是否为
tcp 连接建立后的第一个消息,因为我们可能需要对第一个消息进行额外的验证,比如,果第一个消息不是登录请求,就关闭该
Tcp 连接。
    第二个参数
leftData ,表示
RequestData.Buff 中的数据经过消息分裂器分裂之后余下的数据(一条非完整的消息),这些数据被
Tcp 组件用来放在下一次收到的数据的头部进行消息重组。
    第三个参数
validation ,是个
ref 参数,用于通知
Tcp 组件对消息验证的结果,如果验证失败,
Tcp 组件将关闭对应的
Tcp 连接。
    该方法的返回值是回复的集合,每一个回复对应一个请求,而
RequestData.Buff 中的数据可能分裂成多个请求。另外要注意,有些请求可能是没有回复消息的。
    在我们的
Tcp 组件的两种实现中,都可以看到类似下面的与消息分派器交互的语句:
                //
处理请求    
                
byte
[] leftData 
=
 
null
 ;                
                ArrayList repondList 
=
 
this
.messageDispatcher.DealRequestMessage(key.RequestData  ,
out
 leftData , 
ref
 key.Validation) ;
                
                
if
(
this
.validateRequest)
                {
                    
if
(key.Validation.gotoCloseConnection)
                    {
                        
this
.DisposeOneConnection(streamHashCode ,key.Validation.cause) ;
                    }
                }
  
2 .消息分派器组件基本元素的实现
    正如在实现Tcp组件之前需要构建一些基本元素,在实现消息分派器之前也是如此,用于支持消息分派器实现的基本元素包括:IDataStreamHelper、消息分裂器、消息处理器工厂、ITcpStreamDispatcherHook等。
(1)IDataStreamHelper消息分裂器
    IDataStreamHelper,前文中已经提到,IDataStreamHelper用于从请求/回复消息中提取消息的“元数据”,并提供一些辅助方法,每个特定的应用,它们对IDataStreamHelper的实现可能是不一样的。IDataStreamHelper接口定义如下:
     ///
 
<summary>
    
///
 IDataStreamHelper 通信协议的面向流辅助设施。
    
///
 
</summary>
    
public
 
interface
 IDataStreamHelper :IStringEncoder
    {
        
int
 MaxRecieveBuffSize{
get
 ;} 
//
接收缓冲区的大小
        
int
 MessageHeaderLength{
get
 ;} 
//
消息头的长度
        
int
 OffsetOfLengthField{
get
 ;} 
//
表示消息长度的字段在消息头中的偏移
        IDataStreamHeader ParseMessageHeader(
byte
[] data ,
int
 offset) ; 
//
解析消息头
        LengthTypeInHeader LengthTypeInHeader{
get
 ;} 
        
        
byte
[] GetRespondWhenFailure(
byte
[] reqData ,ServiceFailureType failType) ;    
//
根据服务失败类型获取失败回复消息
        
byte
[] GetRespondWhenFailure(
byte
[] reqData ,
string
 errorMsg) ;            
    }
    
///
 
<summary>
    
///
 StringEncoder 限定字符串编码格式
    
///
 
</summary>
    
public
 
interface
 IStringEncoder
    {
        
string
 GetStrFromStream(
byte
[] stream ,
int
 offset ,
int
 len) ;
        
byte
[] GetBytesFromStr(
string
 ss) ;
    }
    
///
 
<summary>
    
///
 ServiceFailureType 服务失败类型
    
///
 
</summary>
    
public
 
enum
 ServiceFailureType
    {
        InvalidMessge ,ParseFailure ,HandleFailure ,ServiceStopped ,ServiceIsNotExit ,ServerIsBusy 
    }

    IDataStreamHeader即是我们所说的消息的“元数据”,如其名所示,它也是消息的“消息头”。请让我补充说明一下,依照我的经验,消息由消息头Header和消息主体Body组成,消息头用于存放消息的“元数据”等信息,而消息主体用于存放与特定请求相关的数据。消息头的长度固定,比如都是64字节或都是128字节。请求消息和回复消息公用相同格式的消息头。我们来看看消息头接口IDataStreamHeader的定义:

    public
 
interface
 IDataStreamHeader
    {
        
int
 MessageLength    {
get
 ;
set
 ;} 
//
本消息长度
        
int
 TypeKey            {
get
 ;
set
 ;} 
//
请求的目录类型
        
int
 ServiceKey        {
get
 ;
set
 ;} 
//
请求类型
        
int
 ServiceItemIndex{
get
 ;
set
 ;} 
//
请求细分索引
        
int
 RandomNum        {
get
 ;
set
 ;} 
//
用于将回复与请求一一对应起来        
        
int
 Result            {
get
 ;
set
 ;} 
//
服务结果    
    
        
string
 UserID        {
get
 ;
set
 ;} 
//
发出请求的用户编号
        
byte
[] ToDataStream() ;              
//
将消息头转化为流,流的长度位消息头的长度
        
void
   ToDataStream(
byte
[] buff ,
int
 offset);    
    }

    需要解释一下TypeKeyServiceKeyServiceItemIndex,我们实际上将服务类型分为三级,可以举个不太恰当的例子让大家有个感性的认识。比如,生活中的衣、食、住、行可以作为不同的TypeKey,而“衣”中的春装、冬装可作为ServiceKey,而“春装”中的T恤、夹克可作为ServiceItemIndex。对于服务的类型,你可以根据自己的意愿分成任意层级,但据我的经验,通常情况下,三层已经够用了。 

(2)消息分裂器

    前面已经多次提到消息分裂器MessageSplitter,它用于将接收缓冲区中的数据分裂成一个个完整的消息,并且把余下的非完整数据返回,其接口定义如下:

public
 
interface
 IMessageSplitter
    {
        
void
 Initialize(
int
 maxBuffSize ,
int
 headerLen ,
int
 offSetLenField ,LengthTypeInHeader lenType) ;
        ArrayList SplitRequestMsgs(
byte
[] buff ,
int
 validCount , 
out
 
byte
[] leftData) ;
//
ArrayList 中每条记录都是是byte[],表示一个完整的请求
    }
    
//
消息头中的长度是body长度还是总长度
    
public
 
enum
 LengthTypeInHeader
    {
        TotalLen ,BodyLen 
    }

    其中,Initialize方法中的参数都可以由IDataStreamHeader提供。leftData是余下的非完整消息的数据。SplitRequestMsgs方法返回的集合中是一条条完整的请求消息。

(3)消息处理器工厂

    消息处理器工厂根据消息的类型(TypeKeyServiceKey)创建对应的消息处理器来出来该消息,其接口定义如下:

    public
 
interface
 IRequestDealerFactory
    {
        IRequestDealer CreateDealer(
int
 requestType ,
int
 serverTypeKey)  ;
//
serverTypeKey 比如城市代号
        
event
 CbackRequestRecieved RequestRecieved ;
    }

    CreateDealer方法返回的IRequestDealer就是消息处理器,每一个消息处理器用于处理某种特定类型(ServiceKey)的所有请求。通常,可以将消息处理器封装成插件DLL,以实现功能服务的“热插拔”。 

(4)消息处理器

    消息处理器IRequestDealer定义如下:

    public
 
interface
 IRequestDealer
    {        
        
byte
[]  DealRequestMessage(RoundedRequestMsg reqMsg ) ;
//
同步回复
        
event
 CbackRequestRecieved RequestRecieved ;
    }
    
public
 
delegate
 
void
 CbackRequestRecieved(RoundedRequestMsg roundedMsg) ;
    
///
 
<summary>
    
///
 RoundedRequestMsg 对应于一条完整的请求
    
///
 
</summary>
    
public
 
struct
 RoundedRequestMsg
    {
        
public
 
int
 ConnectID ; 
//
请求所对应的Tcp连接
        
public
 
byte
[] Data ;
    }

    RoundedRequestMsg.Data是经消息分裂器分裂得到的一个完整的请求消息,一个字节不多、一个字节也不少。 

(5)ITcpStreamDispatcherHook

    ITcpStreamDispatcherHook是一个Hook,它为用户提供了一个自定义的对请求/回复消息进行操作的插入点。ITcpStreamDispatcherHookTcpStreamDispatcher使用,用于对请求消息和回复消息进行截获,然后处理或转换这些消息,比如常用的处理/转换操作包括:加密/解密、消息验证等等。ITcpStreamDispatcherHook定义如下:

    ///
 
<summary>
    
///
 ITcpStreamDispatcherHook 由TcpStreamDispatcher使用,用于对请求消息和回复消息进行截获,然后处理转换这些消息,
    
///
 比如加密/解密。  
    
///
 
</summary>
    
public
 
interface
 ITcpStreamDispatcherHook
    {
        
//
转换消息
        
byte
[] CaptureRequestMsg(
byte
[] roundedMsg) ;
        
byte
[] CaptureRespondMsg(
byte
[] roundedMsg) ;
        
//
验证消息,以下验证的消息是还没有被捕获的消息
        
bool
 VerifyFirstMsgOfUser(
byte
[] roundedMsg ,
ref
 RequestValidation validation) ;
        
bool
 VerifyOtherMessage(
byte
[]   roundedMsg ,
ref
 RequestValidation validation) ;
    }

    关于这个接口中各方法的含义可以在消息分派器的实现中更好的领会! 

3.  消息分派器实现

    在前述的基本元素的基础上,实现消息分派器非常简单,我们来看其核心方法DealRequestMessage的实现源码:

      private IMessageSplitter               curMsgSplitter = new MessageSpliter() ;
      private IDataStreamHelper            curMsgHelper ;  //必须设置
      private IRequestDealerFactory       curDealerFactory ;  //必须设置
      private ITcpStreamDispatcherHook tcpStreamDispatcherHook ;
       public
 ArrayList DealRequestMessage(RequestData requestData, 
out
 
byte
[] leftData, 
ref
 RequestValidation validation)
        {
            
//
消息分裂
            ArrayList respondList 
=
 
new
 ArrayList() ;
            ArrayList reqList 
=
 
this
.curMsgSplitter.SplitRequestMsgs(requestData.Buff ,requestData.ValidCount ,
out
 leftData) ;
            
if
(reqList 
==
 
null
)
            {
                
return
 respondList ;
            }                
            
bool
 verified 
=
 
true
 ;
            
for
(
int
 i
=
0
; i
<
reqList.Count ;i
++
)
            {        
                
byte
[] theData 
=
 (
byte
[])reqList[i] ;
                
#region
 验证消息                
                
if
(requestData.IsFirstMsg 
&&
 (i 
==
 
0
))
                {                        
                    verified 
=
 
this
.tcpStreamDispatcherHook.VerifyFirstMsgOfUser(theData ,
ref
 validation) ;                    
                }
                
else
                {                            
                    verified 
=
 
this
.tcpStreamDispatcherHook.VerifyOtherMessage(theData ,
ref
 validation ) ;                    
                }
                
if
(
!
 verified)
                {
                    
if
(validation.gotoCloseConnection)
                    {
                        
return
 
null
 ;
                    }
                    
this
.AddRespondToList(respondList ,
this
.curMsgHelper.GetRespondWhenFailure(theData ,ServiceFailureType.InvalidMessge)) ;
                    
continue
 ;
                }
                
#endregion
                
                
//
接插,捕获/转换请求消息
                
byte
[] reqData 
=
 
this
.tcpStreamDispatcherHook.CaptureRequestMsg(theData) ;                
                
#region
 处理消息
                
//
处理消息
                IDataStreamHeader header 
=
 
this
.curMsgHelper.ParseMessageHeader(reqData ,
0
);                
                IRequestDealer dealer 
=
 
this
.curDealerFactory.CreateDealer(header.ServiceKey ,header.TypeKey) ;
                
if
(dealer 
==
 
null
)
                {
                    
this
.AddRespondToList(respondList ,
this
.curMsgHelper.GetRespondWhenFailure(reqData ,ServiceFailureType.ServiceIsNotExit)) ;
                    
continue
 ;
                }
                RoundedRequestMsg roundReqMsg 
=
 
new
 RoundedRequestMsg();
                roundReqMsg.ConnectID 
=
 requestData.ConnectID ;
                roundReqMsg.Data 
=
 reqData ;    
                
try
                {
                    
byte
[] respondData 
=
 dealer.DealRequestMessage(roundReqMsg) ;
                    
                    
if
(respondData 
!=
 
null
)
                    {
                        
this
.AddRespondToList(respondList ,respondData) ;
                    }
                }
                
catch
(Exception ee)
                {                    
                    
this
.AddRespondToList(respondList , 
this
.curMsgHelper.GetRespondWhenFailure(reqData ,ee.Message)) ;
                }    
                
#endregion
            }
            
return
 respondList;
        }
        
//
将回复消息加密后放入list
        
private
 
void
 AddRespondToList(ArrayList list ,
byte
[] theRespondData)
        {
            
//
接插,捕获/转换回复消息
            
byte
[] respondData 
=
 
this
.tcpStreamDispatcherHook.CaptureRespondMsg(theRespondData) ;    
            list.Add(respondData) ;
        }

    如果你是一直按顺序读下来的,理解上面的实现一定不成什么问题。到这里,Tcp通信层的所有重要的设施基本都已介绍完毕,最后,给出了提示,即,在你的应用中,如何使用这个可复用的Tcp通信层。步骤如下:

(1)实现IDataStreamHelper接口。
(2)实现IReqestStreamDispatcher接口,如果采用的是Tcp协议,则可直接使用参考实现TcpStreamDispatcher
(3)实现各种请求处理器,这些处理器实现IRequestDealer接口。
(4)实现IRequestDealerFactory接口。 

    接下来,还有什么?其实,还有很多,都可以提高到框架的层次,以便复用。比如,前面我们处理消息都是基于流(byte[])的形式,在此基础上,我们可以更上一层,采用基于对象的形式――即,将请求消息和回复消息都封装成类,这就涉及了流的解析(流=>对象)和对象序列化(消息对象=>流)问题;另外,我们甚至可以将Tcp用户管理纳入到框架的高度,以进行复用,比如,通常基于Tcp服务的系统都需要管理在线的Tcp用户,并记录Tcp用户请求服务的具体信息、在线时间等,这些经过良好的分析概括都可以提高到复用的高度。以后有时间,我会将这样的经验和大家分享。

    最后,把EnterpriseServerBase类库中的Network命名空间中的源码和大家共享,希望对大家有所帮助!(另,该命名空间中已经包含了上述的基于对象的消息和Tcp用户管理的可复用组件)。

 

转载于:https://www.cnblogs.com/fx2008/archive/2011/11/25/2263505.html

你可能感兴趣的文章
取消凭证分解 (取消公司下的多个利润中心)
查看>>
flask ORM: Flask-SQLAlchemy【单表】增删改查
查看>>
vim 常用指令
查看>>
nodejs 获取自己的ip
查看>>
Nest.js 处理错误
查看>>
你好,C++(16)用表达式表达我们的设计意图——4.1 用操作符对数据进行运算...
查看>>
18.3 redis 的安装
查看>>
jdbc 简单连接
查看>>
Activiti 实战篇 小试牛刀
查看>>
java中的Static class
查看>>
Xshell 连接CentOS服务器解密
查看>>
[工具类]视频音频格式转换
查看>>
GNS3与抓包工具Wireshark的关联
查看>>
groovy-语句
查看>>
VIM寄存器使用
查看>>
Java VisualVM远程监控JVM
查看>>
nasm预处理器(2)
查看>>
二叉排序树 算法实验
查看>>
Silverlight 5 beta新特性探索系列:10.浏览器模式下内嵌HTML+浏览器模式下创建txt文本文件...
查看>>
YourSQLDba 配置——修改备份路径
查看>>