Netty与Spring Boot的整合
最近有朋友向我询问一些 Netty 与 SpringBoot 整合的相关问题,这里,我就总结了一下基本整合流程,也就是说,这篇文章 ,默认大家是对 netty 与 Spring,SpringMVC 的整合是没有什么问题的。现在,就进入正题吧。
Server 端:#
总的来说,服务端还是比较简单的,自己一共写了三个核心类。分别是
- NettyServerListener:服务启动监听器
- ServerChannelHandlerAdapter:通道适配器,主要用于多线程共享
- RequestDispatcher:请求分排器
下面开始集成过程:
-
在 pom.xml 中添加以下依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
-
让 SpringBoot 的启动类实现 CommandLineRunner 接口并重写 run 方法, 比如我的启动类是 CloudApplication.java
@SpringBootApplication public class CloudApplication implements CommandLineRunner {
<span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-built_in">void</span> <span class="hljs-title function_">main</span>(<span class="hljs-params"><span class="hljs-built_in">String</span>[] args</span>) { <span class="hljs-title class_">SpringApplication</span>.<span class="hljs-title function_">run</span>(<span class="hljs-title class_">CloudApplication</span>.<span class="hljs-property">class</span>, args); } <span class="hljs-meta">@Override</span> <span class="hljs-keyword">public</span> <span class="hljs-built_in">void</span> <span class="hljs-title function_">run</span>(<span class="hljs-params"><span class="hljs-built_in">String</span>... strings</span>) { }
}
-
创建类 NettyServerListener.java
// 读取 yml 的一个配置类 import com.edu.hart.modules.constant.NettyConfig; // Netty 连接信息配置类 import com.edu.hart.modules.constant.NettyConstant; // import com.edu.hart.rpc.util.ObjectCodec; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;/**
-
服务启动监听器
@author 叶云轩
/
public class NettyServerListener {
/*- NettyServerListener 日志输出器
- @author 叶云轩 create by 2017/10/31 18:05
/
private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
/* - 创建 bootstrap
/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/* - BOSS
/
EventLoopGroup boss = new NioEventLoopGroup();
/* - Worker
/
EventLoopGroup work = new NioEventLoopGroup();
/* - 通道适配器
/
private ServerChannelHandlerAdapter channelHandlerAdapter;
/* - NETT 服务器配置类
*/
private NettyConfig nettyConfig;
/**
- 关闭服务器方法
*/
public void close() {
LOGGER.info("关闭服务器....");
// 优雅退出
boss.shutdownGracefully();
work.shutdownGracefully();
}
/**
开启及服务线程
*/
public void start() {
// 从配置文件中 (application.yml) 获取服务端监听端口号
int port = nettyConfig.getPort();
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO));
try {
// 设置事件处理
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength()
, 0, 2, 0, 2));
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new ObjectCodec());pipeline.addLast(channelHandlerAdapter); } }); LOGGER.info(<span class="hljs-string">"netty服务器在[{}]端口启动监听"</span>, port); <span class="hljs-type">ChannelFuture</span> <span class="hljs-variable">f</span> <span class="hljs-operator">=</span> serverBootstrap.bind(port).sync(); f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.info("[出现异常] 释放资源");
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
-
-
创建类 ServerChannelHandlerAdapter.java - 通道适配器
// 记录调用方法的元信息的类 import com.edu.hart.rpc.entity.MethodInvokeMeta; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
多线程共享
/
public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
/*- 日志处理
/
private Logger logger = LoggerFactory.getLogger(ServerChannelHandlerAdapter.class);
/* - 注入请求分排器
*/
private RequestDispatcher dispatcher;
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg;
// 屏蔽 toString() 方法
if (invokeMeta.getMethodName().endsWith("toString()")
&& !"class java.lang.String".equals(invokeMeta.getReturnType().toString()))
logger.info("客户端传入参数 :{}, 返回值:{}",
invokeMeta.getArgs(), invokeMeta.getReturnType());
dispatcher.dispatcher(ctx, invokeMeta);
}
}- 日志处理
-
RequestDispatcher.java
// 封装的返回信息枚举类 import com.edu.hart.modules.communicate.ResponseCodeEnum; // 封装的返回信息实体类 import com.edu.hart.modules.communicate.ResponseResult; // 封装的连接常量类 import com.edu.hart.modules.constant.NettyConstant; // 记录元方法信息的实体类 import com.edu.hart.rpc.entity.MethodInvokeMeta; // 对于返回值为空的一个处理 import com.edu.hart.rpc.entity.NullWritable; // 封装的返回信息实体工具类 import com.edu.hart.rpc.util.ResponseResultUtil; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/**
请求分排器
*/
public class RequestDispatcher implements ApplicationContextAware {
private ExecutorService executorService = Executors.newFixedThreadPool(NettyConstant.getMaxThreads());
private ApplicationContext app;/**
- 发送
- @param ctx
- @param invokeMeta
*/
public void dispatcher(final ChannelHandlerContext ctx, final MethodInvokeMeta invokeMeta) {
executorService.submit(() -> {
ChannelFuture f = null;
try {
Class<?> interfaceClass = invokeMeta.getInterfaceClass();
String name = invokeMeta.getMethodName();
Object[] args = invokeMeta.getArgs();
Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
Object targetObject = app.getBean(interfaceClass);
Method method = targetObject.getClass().getMethod(name, parameterTypes);
Object obj = method.invoke(targetObject, args);
if (obj == null) {
f = ctx.writeAndFlush(NullWritable.nullWritable());
} else {
f = ctx.writeAndFlush(obj);
}
f.addListener(ChannelFutureListener.CLOSE);
} catch (Exception e) {
ResponseResult error = ResponseResultUtil.error(ResponseCodeEnum.SERVER_ERROR);
f = ctx.writeAndFlush(error);
} finally {
f.addListener(ChannelFutureListener.CLOSE);
}
});
}
/**
- 加载当前 application.xml
- @param ctx
@throws BeansException
*/
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
this.app = ctx;
}
}
-
application.yml 文件中对于 netty 的一个配置
netty: port: 11111
-
NettyConfig.java
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component;
/**
-
读取 yml 配置文件中的信息
-
Created by 叶云轩 on 2017/10/31 - 18:38
Concat tdg_yyx@foxmail.com
*/
public class NettyConfig {private int port;
public int getPort() {
return port;
}public void setPort(int port) {
this.port = port;
}
}
-
-
NettyConstanct.java
import org.springframework.stereotype.Component;
/**
-
Netty 服务器常量
-
Created by 叶云轩 on 2017/10/31 - 17:47
Concat tdg_yyx@foxmail.com
*/
@Component
public class NettyConstant {/**
- 最大线程量
/
private static final int MAX_THREADS = 1024;
/* - 数据包最大长度
*/
private static final int MAX_FRAME_LENGTH = 65535;
public static int getMaxFrameLength() {
return MAX_FRAME_LENGTH;
}public static int getMaxThreads() {
return MAX_THREADS;
}
}- 最大线程量
-
至此,netty 服务端算是与 SpringBoot 整合成功。那么看一下启动情况吧。
Client 端:#
Client 我感觉要比 Server 端要麻烦一点。这里还是先给出核心类吧。
- NettyClient : netty 客户端
- ClientChannelHandlerAdapter : 客户端通道适配器
- CustomChannelInitalizer:自定义通道初始化工具
- RPCProxyFactoryBean:RPC 通信代理工厂
在 Client 端里。SpringBoot 的启动类要继承 SpringBootServletInitializer 这个类,并覆盖 SpringApplicationBuilder 方法
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
@SpringBootApplication
public class OaApplication extends SpringBootServletInitializer {
public static void main(<span class="hljs-type">String</span>[] args) {
<span class="hljs-type">SpringApplication</span>.run(<span class="hljs-type">OaApplication</span>.<span class="hljs-keyword">class</span>, args);
}
<span class="hljs-meta">@Override</span>
<span class="hljs-keyword">protected</span> <span class="hljs-type">SpringApplicationBuilder</span> configure(<span class="hljs-type">SpringApplicationBuilder</span> builder) {
<span class="hljs-keyword">return</span> builder.sources(<span class="hljs-type">OaApplication</span>.<span class="hljs-keyword">class</span>);
}
}
-
NettyClient.java
// 记录元方法信息的实体类 import com.edu.hart.rpc.entity.MethodInvokeMeta; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import javax.management.MBeanServer;
/**
-
客户端发送类
-
Created by 叶云轩 on 2017/6/16-16:58
Concat tdg_yyx@foxmail.com
*/
public class NettyClient {private Logger logger = LoggerFactory.getLogger(MBeanServer.class);
private Bootstrap bootstrap;
private EventLoopGroup worker;
private int port;
private String url;
private int MAX_RETRY_TIMES = 10;public NettyClient(String url, int port) {
this.url = url;
this.port = port;
bootstrap = new Bootstrap();
worker = new NioEventLoopGroup();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
}public void close() {
logger.info("关闭资源");
worker.shutdownGracefully();
}public Object remoteCall(final MethodInvokeMeta cmd, int retry) {
try {
CustomChannelInitializerClient customChannelInitializer = new CustomChannelInitializerClient(cmd);
bootstrap.handler(customChannelInitializer);
ChannelFuture sync = bootstrap.connect(url, port).sync();
sync.channel().closeFuture().sync();
Object response = customChannelInitializer.getResponse();
return response;
} catch (InterruptedException e) {
retry++;
if (retry > MAX_RETRY_TIMES) {
throw new RuntimeException("调用 Wrong");
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
logger.info("第 {} 次尝试.... 失败", retry);
return remoteCall(cmd, retry);
}
}
}
}
-
-
ClientChannelHandlerAdapter.java
import com.edu.hart.rpc.entity.MethodInvokeMeta; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
/**
-
Created by 叶云轩 on 2017/6/16-17:03
Concat tdg_yyx@foxmail.com
*/
public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(ClientChannelHandlerAdapter.class);
private MethodInvokeMeta methodInvokeMeta;
private CustomChannelInitializerClient channelInitializerClient;public ClientChannelHandlerAdapter(MethodInvokeMeta methodInvokeMeta, CustomChannelInitializerClient channelInitializerClient) {
this.methodInvokeMeta = methodInvokeMeta;
this.channelInitializerClient = channelInitializerClient;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.info("客户端出异常了, 异常信息:{}", cause.getMessage());
cause.printStackTrace();
ctx.close();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (methodInvokeMeta.getMethodName().endsWith("toString") && !"class java.lang.String".equals(methodInvokeMeta.getReturnType().toString()))
logger.info("客户端发送信息参数:{}, 信息返回值类型:{}", methodInvokeMeta.getArgs(), methodInvokeMeta.getReturnType());
ctx.writeAndFlush(methodInvokeMeta);}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
channelInitializerClient.setResponse(msg);
}
}
-
-
CustomChannelInitializerClient.java
import com.edu.hart.rpc.entity.MethodInvokeMeta; import com.edu.hart.rpc.entity.NullWritable; import com.edu.hart.rpc.util.ObjectCodec; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
/**
- Created by 叶云轩 on 2017/6/16-15:01
- Concat tdg_yyx@foxmail.com
*/
public class CustomChannelInitializerClient extends ChannelInitializer {
private Logger logger = LoggerFactory.getLogger(CustomChannelInitializerClient.class);
private MethodInvokeMeta methodInvokeMeta;
private Object response;
public CustomChannelInitializerClient(MethodInvokeMeta methodInvokeMeta) {
if (!"toString".equals(methodInvokeMeta.getMethodName())) {
logger.info("[CustomChannelInitializerClient] 调用方法名:{},入参:{}, 参数类型:{},返回值类型 {}"
, methodInvokeMeta.getMethodName()
, methodInvokeMeta.getArgs()
, methodInvokeMeta.getParameterTypes()
, methodInvokeMeta.getReturnType());
}
this.methodInvokeMeta = methodInvokeMeta;
}
public Object getResponse() {
if (response instanceof NullWritable) {
return null;
}
return response;
}
public void setResponse(Object response) {
this.response = response;
}
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2));
pipeline.addLast(new ObjectCodec());
pipeline.addLast(new ClientChannelHandlerAdapter(methodInvokeMeta, this));
}
}
4. RPCProxyFactoryBean.java
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import com.edu.hart.rpc.util.WrapMethodUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* Created by 叶云轩 on 2017/6/16-17:16
* Concat tdg_yyx@foxmail.com
*/
public class RPCProxyFactoryBean extends AbstractFactoryBean