im开发总结:netty的使用

  最近公司在做一个 im 群聊的开发,技术使用得非常多,各种代码封装得也是十分优美,使用到了 netty,zookeeper,redis,线程池·,mongdb,lua,等系列的技术

  netty 是对 nio 的一种封装,很好从效率上解决了,nio 的 epoll 模型的 bug 问题,那么 netty 到底是个什么玩意呢:

 

  

 

 

     这个是官网上的一个截图:

 

 

大概的意思就是:

  Netty 是一种 NIO 客户端服务器框架,它支持网络应用程序 (如协议服务器和客户端) 的快速和轻松开发。它大大简化和简化了网络编程,如 TCP 和 UDP 套接字服务器。“快速和简单”并不意味着最终的应用程序会受到可维护性或性能问题的影响。Netty 是根据许多协议 (如 FTP、SMTP、HTTP 以及各种二进制和基于文本的遗留协议) 的实现所获得的经验精心设计的。因此,Netty 成功地找到了一种在不妥协的情况下实现轻松开发、性能、稳定和灵活性的方法。

那么到底怎么用呢:

 

 

 我可以看到,userguid 部分:

官方推荐使用 netty4,netty5 已经废弃了,

所以我们可以 netty4,进行开发:

首先服务端:

package com.cxy.sever;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class DiscardServer {
private int port;

</span><span style="color: rgba(0, 0, 255, 1)">public</span> DiscardServer(<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> port) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.port =<span style="color: rgba(0, 0, 0, 1)"> port;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() throws Exception {
    EventLoopGroup bossGroup </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> NioEventLoopGroup(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (1)</span>
    EventLoopGroup workerGroup = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> NioEventLoopGroup();
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
        ServerBootstrap b </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> ServerBootstrap(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (2)</span>

b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.
class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG,
128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

        </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Bind and start to accept incoming connections.</span>
        ChannelFuture f = b.bind(port).sync(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (7)

        </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Wait until the server socket is closed.
        </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> In this example, this does not happen, but you can do that to gracefully
        </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> shut down your server.</span>

f.channel().closeFuture().sync();
}
finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> main(String[] args) throws Exception {
    </span><span style="color: rgba(0, 0, 255, 1)">int</span> port = <span style="color: rgba(128, 0, 128, 1)">8081</span><span style="color: rgba(0, 0, 0, 1)">;
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (args.length &gt; <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">) {
        port </span>= Integer.parseInt(args[<span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">]);
    }

    </span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> DiscardServer(port).run();
}

}

package com.cxy.sever;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class DiscardServerHandler extends ChannelInboundHandlerAdapter {

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> channelActive(final ChannelHandlerContext ctx) { <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (1)</span>
    final ByteBuf time = ctx.alloc().buffer(<span style="color: rgba(128, 0, 128, 1)">4</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (2)</span>
    time.writeInt((<span style="color: rgba(0, 0, 255, 1)">int</span>) (System.currentTimeMillis() / <span style="color: rgba(128, 0, 128, 1)">1000L</span> + <span style="color: rgba(128, 0, 128, 1)">2208988800L</span><span style="color: rgba(0, 0, 0, 1)">));

    final ChannelFuture f </span>= ctx.writeAndFlush(time); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (3)</span>
    <span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">f.addListener(new ChannelFutureListener() {

        public void operationComplete(ChannelFuture future) {
            assert f == future;
            ctx.close();
        }
    }); // (4)</span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> channelRead(ChannelHandlerContext ctx, Object msg) { <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (2)</span>
ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush();} } finally {ReferenceCountUtil.release(msg); // (2) } }
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (4)
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Close the connection when an exception is raised.</span>

cause.printStackTrace();
ctx.close();
}

}

客户端

package com.cxy.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
public static void main(String[] args) throws Exception {
String host
= "127.0.0.1";
int port = 8081;
EventLoopGroup workerGroup
= new NioEventLoopGroup();

    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
        Bootstrap b </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Bootstrap(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (1)</span>
        b.group(workerGroup); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (2)</span>
        b.channel(NioSocketChannel.<span style="color: rgba(0, 0, 255, 1)">class</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (3)</span>
        b.option(ChannelOption.SO_KEEPALIVE, <span style="color: rgba(0, 0, 255, 1)">true</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (4)</span>
        b.handler(<span style="color: rgba(0, 0, 255, 1)">new</span> ChannelInitializer&lt;SocketChannel&gt;<span style="color: rgba(0, 0, 0, 1)">() {
            @Override
            </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> TimeClientHandler());
            }
        });

        </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Start the client.</span>
        ChannelFuture f = b.connect(host, port).sync(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (5)

        </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Wait until the connection is closed.</span>

f.channel().closeFuture().sync();
}
finally {
workerGroup.shutdownGracefully();
}
}
}

package com.cxy.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m
= (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.
out.println(new Date(currentTimeMillis));

    } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
        m.release();
    }
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}

}

整个代码都是官网的上的介绍,拿下来的,所以可以看到,整个基础代码就是那么多:

 

 其中 1  是构建服务端

  2 初始化 socket 通道

  3 添加助手类

  4 设置 option

  5 设置长连接

  6 绑定端口

  7 关闭 channel

  8 关闭主从线程池模型

 

 

 1  创建客户端

2 加入线程池

3 设置长连接

4 绑定相关助手类

5 流水线上添加助手类

6 绑定连接的主机和端口

7 关闭 channel

8 关闭工作线程池

 

可以看出,那么整合 netty 客户端,服务端的基础代码都是一致的,那么我们使用 netty 开发需要注意哪些问题呢

我们著需要关闭我们的 handler 的开发,其他的不用关心。