im开发总结:netty的使用
最近公司在做一个 im 群聊的开发,技术使用得非常多,各种代码封装得也是十分优美,使用到了 netty,zookeeper,redis,线程池·,mongdb,lua,等系列的技术
netty 是对 nio 的一种封装,很好从效率上解决了,nio 的 epoll 模型的 bug 问题,那么 netty 到底是个什么玩意呢:
这个是官网上的一个截图:
大概的意思就是:
Netty 是一种 NIO 客户端服务器框架,它支持网络应用程序 (如协议服务器和客户端) 的快速和轻松开发。它大大简化和简化了网络编程,如 TCP 和 UDP 套接字服务器。“快速和简单”并不意味着最终的应用程序会受到可维护性或性能问题的影响。Netty 是根据许多协议 (如 FTP、SMTP、HTTP 以及各种二进制和基于文本的遗留协议) 的实现所获得的经验精心设计的。因此,Netty 成功地找到了一种在不妥协的情况下实现轻松开发、性能、稳定和灵活性的方法。
那么到底怎么用呢:
我可以看到,userguid 部分:
官方推荐使用 netty4,netty5 已经废弃了,
所以我们可以 netty4,进行开发:
首先服务端:
package com.cxy.sever;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class DiscardServer {
private int port;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> DiscardServer(<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> port) {
</span><span style="color: rgba(0, 0, 255, 1)">this</span>.port =<span style="color: rgba(0, 0, 0, 1)"> port;
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() throws Exception {
EventLoopGroup bossGroup </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> NioEventLoopGroup(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (1)</span>
EventLoopGroup workerGroup = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> NioEventLoopGroup();
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
ServerBootstrap b </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> ServerBootstrap(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (2)</span>
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Bind and start to accept incoming connections.</span>
ChannelFuture f = b.bind(port).sync(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (7)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Wait until the server socket is closed.
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> In this example, this does not happen, but you can do that to gracefully
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> shut down your server.</span>
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> main(String[] args) throws Exception {
</span><span style="color: rgba(0, 0, 255, 1)">int</span> port = <span style="color: rgba(128, 0, 128, 1)">8081</span><span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (args.length > <span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">) {
port </span>= Integer.parseInt(args[<span style="color: rgba(128, 0, 128, 1)">0</span><span style="color: rgba(0, 0, 0, 1)">]);
}
</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> DiscardServer(port).run();
}
}
package com.cxy.sever;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> channelActive(final ChannelHandlerContext ctx) { <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (1)</span>
final ByteBuf time = ctx.alloc().buffer(<span style="color: rgba(128, 0, 128, 1)">4</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (2)</span>
time.writeInt((<span style="color: rgba(0, 0, 255, 1)">int</span>) (System.currentTimeMillis() / <span style="color: rgba(128, 0, 128, 1)">1000L</span> + <span style="color: rgba(128, 0, 128, 1)">2208988800L</span><span style="color: rgba(0, 0, 0, 1)">));
final ChannelFuture f </span>= ctx.writeAndFlush(time); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (3)</span>
<span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)</span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> channelRead(ChannelHandlerContext ctx, Object msg) { <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (2)</span>
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();}
} finally {ReferenceCountUtil.release(msg); // (2)
}
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (4)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Close the connection when an exception is raised.</span>
cause.printStackTrace();
ctx.close();
}
}
客户端
package com.cxy.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 8081;
EventLoopGroup workerGroup = new NioEventLoopGroup();
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
Bootstrap b </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Bootstrap(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (1)</span>
b.group(workerGroup); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (2)</span>
b.channel(NioSocketChannel.<span style="color: rgba(0, 0, 255, 1)">class</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (3)</span>
b.option(ChannelOption.SO_KEEPALIVE, <span style="color: rgba(0, 0, 255, 1)">true</span>); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (4)</span>
b.handler(<span style="color: rgba(0, 0, 255, 1)">new</span> ChannelInitializer<SocketChannel><span style="color: rgba(0, 0, 0, 1)">() {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> TimeClientHandler());
}
});
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Start the client.</span>
ChannelFuture f = b.connect(host, port).sync(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> (5)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Wait until the connection is closed.</span>
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
package com.cxy.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
} </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
m.release();
}
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
整个代码都是官网的上的介绍,拿下来的,所以可以看到,整个基础代码就是那么多:
其中 1 是构建服务端
2 初始化 socket 通道
3 添加助手类
4 设置 option
5 设置长连接
6 绑定端口
7 关闭 channel
8 关闭主从线程池模型
1 创建客户端
2 加入线程池
3 设置长连接
4 绑定相关助手类
5 流水线上添加助手类
6 绑定连接的主机和端口
7 关闭 channel
8 关闭工作线程池
可以看出,那么整合 netty 客户端,服务端的基础代码都是一致的,那么我们使用 netty 开发需要注意哪些问题呢
我们著需要关闭我们的 handler 的开发,其他的不用关心。