博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
DotNetty的通道处理细节
阅读量:2236 次
发布时间:2019-05-09

本文共 7996 字,大约阅读时间需要 26 分钟。

第一,客户端如何向服务器主动发送消息;

第二,服务器如何向指定客户端发送消息;

第三,在哪里做报文的拆包和组包。

public partial class FrmMain : Form    {        public static object synobj = new object();        public static Int64 count = 0;        public static DateTime dt1 = DateTime.Now;        public static DateTime dt2 = DateTime.Now.AddSeconds(1);        private Timer t = new Timer();        private List
listClients = new List
(); public FrmMain() { InitializeComponent(); t.Interval = 1000; t.Tick += T_Tick; t.Start(); } private void T_Tick(object sender, EventArgs e) { this.Text = (count / (FrmMain.dt2 - FrmMain.dt1).TotalSeconds).ToString(); } ///
/// 启动服务器 /// private async void btnStartServer_Click(object sender, EventArgs e) { IEventLoopGroup mainGroup; IEventLoopGroup workerGroup; mainGroup = new MultithreadEventLoopGroup(1); workerGroup = new MultithreadEventLoopGroup(); var bootstrap = new ServerBootstrap(); bootstrap.Group(mainGroup, workerGroup); bootstrap.Channel
(); bootstrap .Option(ChannelOption.SoBacklog, 100) .Handler(new LoggingHandler("SRV-LSTN")) .ChildHandler(new ActionChannelInitializer
(channel => { //每个客户端的连接创建,都会执行,channel代表了具体的连接客户端,以下过程为每个客户端连接创建编解码器。 //这里可以对channel进行统一管理,保存到列表当中,这样在主程序(服务器)中就可以针对特定的客户端(即channel)进行消息的发送。 IChannelPipeline pipeline = channel.Pipeline; listClients.Add(channel); pipeline.AddLast(new LoggingHandler("SRV-CONN")); pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); //pipeline.AddLast("heart", new IdleStateHandler(0, 0, 3000 / 1000)); pipeline.AddLast("echo", new EchoServerHandler()); })); dt1 = DateTime.Now; IChannel boundChannel = await bootstrap.BindAsync(5000); #region 模拟服务器向客户端发送消息,前提是,客户端连接后,要保存channel到列表。 //Task.Run(() => //{ // while (true) // { // for (int i = 0; i < listClients.Count; i++) // { // var t = listClients[i];//代表某个客户端连接 // if (t == null) { return; } // var initialMessage = Unpooled.Buffer(256); // byte[] messageBytes = Encoding.UTF8.GetBytes("=======发送消息给客户端======="); // initialMessage.WriteBytes(messageBytes); // t.WriteAndFlushAsync(initialMessage); // } // } //}); #endregion } ///
/// 启动客户端 /// private async void btnStartClient_Click(object sender, EventArgs e) { List
list = new List
(); for (int i = 0; i < 1; i++) { var group = new MultithreadEventLoopGroup(); var bootstrap = new Bootstrap(); bootstrap .Group(group) .Channel
() .Option(ChannelOption.TcpNodelay, true) .Handler(new ActionChannelInitializer
(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new LoggingHandler()); pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); //pipeline.AddLast("heart", new IdleStateHandler(0, 0, 3000 / 1000)); pipeline.AddLast("echo", new EchoClientHandler()); })); IChannel clientChannel = await bootstrap.ConnectAsync(textBox1.Text, 5000); //clientChannel为客户端持有的连接对象,可以通过它主动向服务器发起请求,clientChannel.WriteAndFlushAsync() list.Add(clientChannel); } System.Threading.Thread.Sleep(1000); #region 模拟客户端向服务器发送消息,前提是,客户端链接后,要保存channel。 //list.ForEach(t => //{ // var initialMessage = Unpooled.Buffer(256); // byte[] messageBytes = Encoding.UTF8.GetBytes("====发送消息给服务器===="); // initialMessage.WriteBytes(messageBytes); // t.WriteAndFlushAsync(initialMessage); //}); #endregion } private void FrmMain_Load(object sender, EventArgs e) { //ConsoleLoggerProvider provider = new ConsoleLoggerProvider(new ConsoleLoggerSettings()); //InternalLoggerFactory.DefaultFactory.AddProvider(provider); } }

 上面的代码主要是找到了服务器和客户端各自向对方发送数据的入口点,具体设计时可以对IChannel对象进行封装和维护。那么,对于我们自定义协议,我们怎样进行数据包的组包和拆包呢?答案就时上面代码中的EchoServerHandler和EchoClientHandler两个通道处理器对象。以服务器部分的代码为例:

pipeline.AddLast(new LoggingHandler("SRV-CONN"));pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));//pipeline.AddLast("heart", new IdleStateHandler(0, 0, 3000 / 1000));pipeline.AddLast("echo", new EchoServerHandler());

 服务中创建了以上四个IChannel接口对象,他们之间是什么关系呢?顺序执行!接收和发送都按照AddLast的先后顺序执行。如接收数据时,先做日志处理,再做解码LengthFieldBasedFrameDecoder,最后做EchoServerHandler的自定义处理,因为是接收,所以不做编码LengthFieldPrepender这个处理,这是DotNetty内部判断的,LengthFieldPrepender是继承了MessageToMessageEncoder的,MessageToMessageEncoder本身就代表了编码操作,而接收数据不需要做编码,所以这个操作会被略过。LengthFieldPrepender是在服务器发送数据时才做。

  每个处理过程都接收上个处理过程的处理结果,比如EchoServerHandler接收到的数据,是LengthFieldBasedFrameDecoder处理完成后的输出。演示程序的协议类型是头部两个字节代表数据包长度,后面是数据体,这样在LengthFieldBasedFrameDecoder处理完成后,EchoServerHandler接收到的是不包含描述长度的两个字节,只有数据体部分的数据,这样我们就可以在自定义的EchoServerHandler中,进行数据体的拆包操作了。

  EchoServerHandler和EchoClientHandler的代码如下:

public class EchoServerHandler : ChannelHandlerAdapter    {        public override void ChannelRead(IChannelHandlerContext context, object message)        {            var buffer = message as IByteBuffer;            if (buffer != null)            {                lock (FrmMain.synobj)                {                    FrmMain.count++;                }                FrmMain.dt2 = DateTime.Now;                Console.WriteLine(System.Threading.Thread.CurrentThread.ManagedThreadId + "Received from client: " + buffer.ToString(Encoding.UTF8) + "=" + FrmMain.count / (FrmMain.dt2 - FrmMain.dt1).TotalSeconds);            }            context.WriteAsync(message);        }         public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();         public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)        {            Console.WriteLine("Exception: " + exception);            context.CloseAsync();        }    }
public class EchoClientHandler : ChannelHandlerAdapter    {        readonly IByteBuffer initialMessage;         public EchoClientHandler()        {            this.initialMessage = Unpooled.Buffer(256);            byte[] messageBytes = Encoding.UTF8.GetBytes("Hello world");            this.initialMessage.WriteBytes(messageBytes);        }         public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage);         public override void ChannelRead(IChannelHandlerContext context, object message)        {            var byteBuffer = message as IByteBuffer;            if (byteBuffer != null)            {                Console.WriteLine(System.Threading.Thread.CurrentThread.ManagedThreadId + "Received from server: " + byteBuffer.ToString(Encoding.UTF8));            }             //System.Threading.Thread.Sleep(500);            context.WriteAsync(message);        }         public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();         public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)        {            Console.WriteLine("Exception: " + exception);            context.CloseAsync();        }    }

 

转载于:https://www.cnblogs.com/zeroone/p/8480917.html

你可能感兴趣的文章
sublime text3 快捷键修改
查看>>
关于PHP几点建议
查看>>
硬盘的接口、协议
查看>>
VLAN与子网划分区别
查看>>
Cisco Packet Tracer教程
查看>>
02. 交换机的基本配置和管理
查看>>
03. 交换机的Telnet远程登陆配置
查看>>
微信小程序-调用-腾讯视频-解决方案
查看>>
phpStudy安装yaf扩展
查看>>
密码 加密 加盐 常用操作记录
查看>>
TP 分页后,调用指定页。
查看>>
Oracle数据库中的(+)连接
查看>>
java-oracle中几十个实用的PL/SQL
查看>>
PLSQL常用方法汇总
查看>>
几个基本的 Sql Plus 命令 和 例子
查看>>
PLSQL单行函数和组函数详解
查看>>
Oracle PL/SQL语言初级教程之异常处理
查看>>
Oracle PL/SQL语言初级教程之游标
查看>>
Oracle PL/SQL语言初级教程之操作和控制语言
查看>>
Oracle PL/SQL语言初级教程之过程和函数
查看>>