Netty与Spring Boot的整合

​ 最近有朋友向我询问一些 Netty 与 SpringBoot 整合的相关问题,这里,我就总结了一下基本整合流程,也就是说,这篇文章 ,默认大家是对 netty 与 Spring,SpringMVC 的整合是没有什么问题的。现在,就进入正题吧。

Server 端:#

总的来说,服务端还是比较简单的,自己一共写了三个核心类。分别是

  • NettyServerListener:服务启动监听器
  • ServerChannelHandlerAdapter:通道适配器,主要用于多线程共享
  • RequestDispatcher:请求分排器

下面开始集成过程:

  1. 在 pom.xml 中添加以下依赖

    Copy
    <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
  2. 让 SpringBoot 的启动类实现 CommandLineRunner 接口并重写 run 方法, 比如我的启动类是 CloudApplication.java

    Copy
    @SpringBootApplication public class CloudApplication implements CommandLineRunner {
    <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-built_in">void</span> <span class="hljs-title function_">main</span>(<span class="hljs-params"><span class="hljs-built_in">String</span>[] args</span>) {
        <span class="hljs-title class_">SpringApplication</span>.<span class="hljs-title function_">run</span>(<span class="hljs-title class_">CloudApplication</span>.<span class="hljs-property">class</span>, args);
    }
    
    <span class="hljs-meta">@Override</span>
    <span class="hljs-keyword">public</span> <span class="hljs-built_in">void</span> <span class="hljs-title function_">run</span>(<span class="hljs-params"><span class="hljs-built_in">String</span>... strings</span>) {
    }
    

    }

  3. 创建类 NettyServerListener.java

    Copy
    // 读取 yml 的一个配置类 import com.edu.hart.modules.constant.NettyConfig; // Netty 连接信息配置类 import com.edu.hart.modules.constant.NettyConstant; // import com.edu.hart.rpc.util.ObjectCodec; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;

    import javax.annotation.PreDestroy;
    import javax.annotation.Resource;

    /**

    • 服务启动监听器

    • @author 叶云轩
      /
      @Component
      public class NettyServerListener {
      /
      *

      • NettyServerListener 日志输出器
      • @author 叶云轩 create by 2017/10/31 18:05
        /
        private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
        /
        *
      • 创建 bootstrap
        /
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        /
        *
      • BOSS
        /
        EventLoopGroup boss = new NioEventLoopGroup();
        /
        *
      • Worker
        /
        EventLoopGroup work = new NioEventLoopGroup();
        /
        *
      • 通道适配器
        /
        @Resource
        private ServerChannelHandlerAdapter channelHandlerAdapter;
        /
        *
      • NETT 服务器配置类
        */
        @Resource
        private NettyConfig nettyConfig;

      /**

      • 关闭服务器方法
        */
        @PreDestroy
        public void close() {
        LOGGER.info("关闭服务器....");
        // 优雅退出
        boss.shutdownGracefully();
        work.shutdownGracefully();
        }

      /**

      • 开启及服务线程
        */
        public void start() {
        // 从配置文件中 (application.yml) 获取服务端监听端口号
        int port = nettyConfig.getPort();
        serverBootstrap.group(boss, work)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 100)
        .handler(new LoggingHandler(LogLevel.INFO));
        try {
        // 设置事件处理
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength()
        , 0, 2, 0, 2));
        pipeline.addLast(new LengthFieldPrepender(2));
        pipeline.addLast(new ObjectCodec());

                 pipeline.addLast(channelHandlerAdapter);
             }
         });
         LOGGER.info(<span class="hljs-string">"netty服务器在[{}]端口启动监听"</span>, port);
         <span class="hljs-type">ChannelFuture</span> <span class="hljs-variable">f</span> <span class="hljs-operator">=</span> serverBootstrap.bind(port).sync();
         f.channel().closeFuture().sync();
        

        } catch (InterruptedException e) {
        LOGGER.info("[出现异常] 释放资源");
        boss.shutdownGracefully();
        work.shutdownGracefully();
        }
        }
        }

  4. 创建类 ServerChannelHandlerAdapter.java - 通道适配器

    Copy
    // 记录调用方法的元信息的类 import com.edu.hart.rpc.entity.MethodInvokeMeta; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;

    import javax.annotation.Resource;

    /**

    • 多线程共享
      /
      @Component
      @Sharable
      public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
      /
      *

      • 日志处理
        /
        private Logger logger = LoggerFactory.getLogger(ServerChannelHandlerAdapter.class);
        /
        *
      • 注入请求分排器
        */
        @Resource
        private RequestDispatcher dispatcher;

      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      cause.printStackTrace();
      ctx.close();
      }

      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) {
      MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg;
      // 屏蔽 toString() 方法
      if (invokeMeta.getMethodName().endsWith("toString()")
      && !"class java.lang.String".equals(invokeMeta.getReturnType().toString()))
      logger.info("客户端传入参数 :{}, 返回值:{}",
      invokeMeta.getArgs(), invokeMeta.getReturnType());
      dispatcher.dispatcher(ctx, invokeMeta);
      }
      }

  5. RequestDispatcher.java

    Copy
    // 封装的返回信息枚举类 import com.edu.hart.modules.communicate.ResponseCodeEnum; // 封装的返回信息实体类 import com.edu.hart.modules.communicate.ResponseResult; // 封装的连接常量类 import com.edu.hart.modules.constant.NettyConstant; // 记录元方法信息的实体类 import com.edu.hart.rpc.entity.MethodInvokeMeta; // 对于返回值为空的一个处理 import com.edu.hart.rpc.entity.NullWritable; // 封装的返回信息实体工具类 import com.edu.hart.rpc.util.ResponseResultUtil; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;

    import java.lang.reflect.Method;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    /**

    • 请求分排器
      */
      @Component
      public class RequestDispatcher implements ApplicationContextAware {
      private ExecutorService executorService = Executors.newFixedThreadPool(NettyConstant.getMaxThreads());
      private ApplicationContext app;

      /**

      • 发送
      • @param ctx
      • @param invokeMeta
        */
        public void dispatcher(final ChannelHandlerContext ctx, final MethodInvokeMeta invokeMeta) {
        executorService.submit(() -> {
        ChannelFuture f = null;
        try {
        Class<?> interfaceClass = invokeMeta.getInterfaceClass();
        String name = invokeMeta.getMethodName();
        Object[] args = invokeMeta.getArgs();
        Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
        Object targetObject = app.getBean(interfaceClass);
        Method method = targetObject.getClass().getMethod(name, parameterTypes);
        Object obj = method.invoke(targetObject, args);
        if (obj == null) {
        f = ctx.writeAndFlush(NullWritable.nullWritable());
        } else {
        f = ctx.writeAndFlush(obj);
        }
        f.addListener(ChannelFutureListener.CLOSE);
        } catch (Exception e) {
        ResponseResult error = ResponseResultUtil.error(ResponseCodeEnum.SERVER_ERROR);
        f = ctx.writeAndFlush(error);
        } finally {
        f.addListener(ChannelFutureListener.CLOSE);
        }
        });
        }

      /**

      • 加载当前 application.xml
      • @param ctx
      • @throws BeansException
        */
        public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        this.app = ctx;
        }
        }
  6. application.yml 文件中对于 netty 的一个配置

    Copy
    netty: port: 11111
  7. NettyConfig.java

    Copy
    import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component;

    /**

    • 读取 yml 配置文件中的信息

    • Created by 叶云轩 on 2017/10/31 - 18:38

    • Concat tdg_yyx@foxmail.com
      */
      @Component
      @ConfigurationProperties(prefix = "netty")
      public class NettyConfig {

      private int port;

      public int getPort() {
      return port;
      }

      public void setPort(int port) {
      this.port = port;
      }
      }

  8. NettyConstanct.java

    Copy
    import org.springframework.stereotype.Component;

    /**

    • Netty 服务器常量

    • Created by 叶云轩 on 2017/10/31 - 17:47

    • Concat tdg_yyx@foxmail.com
      */
      @Component
      public class NettyConstant {

      /**

      • 最大线程量
        /
        private static final int MAX_THREADS = 1024;
        /
        *
      • 数据包最大长度
        */
        private static final int MAX_FRAME_LENGTH = 65535;

      public static int getMaxFrameLength() {
      return MAX_FRAME_LENGTH;
      }

      public static int getMaxThreads() {
      return MAX_THREADS;
      }
      }

  9. 至此,netty 服务端算是与 SpringBoot 整合成功。那么看一下启动情况吧。

    spring-boot netty集成启动

Client 端:#

Client 我感觉要比 Server 端要麻烦一点。这里还是先给出核心类吧。

  • NettyClient : netty 客户端
  • ClientChannelHandlerAdapter : 客户端通道适配器
  • CustomChannelInitalizer:自定义通道初始化工具
  • RPCProxyFactoryBean:RPC 通信代理工厂

在 Client 端里。SpringBoot 的启动类要继承 SpringBootServletInitializer 这个类,并覆盖 SpringApplicationBuilder 方法

Copy
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.web.support.SpringBootServletInitializer;

@SpringBootApplication
public class OaApplication extends SpringBootServletInitializer {

public static void main(<span class="hljs-type">String</span>[] args) {
    <span class="hljs-type">SpringApplication</span>.run(<span class="hljs-type">OaApplication</span>.<span class="hljs-keyword">class</span>, args);
}

<span class="hljs-meta">@Override</span>
<span class="hljs-keyword">protected</span> <span class="hljs-type">SpringApplicationBuilder</span> configure(<span class="hljs-type">SpringApplicationBuilder</span> builder) {
    <span class="hljs-keyword">return</span> builder.sources(<span class="hljs-type">OaApplication</span>.<span class="hljs-keyword">class</span>);
}

}

  1. NettyClient.java

    Copy
    // 记录元方法信息的实体类 import com.edu.hart.rpc.entity.MethodInvokeMeta; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

    import javax.management.MBeanServer;

    /**

    • 客户端发送类

    • Created by 叶云轩 on 2017/6/16-16:58

    • Concat tdg_yyx@foxmail.com
      */
      public class NettyClient {

      private Logger logger = LoggerFactory.getLogger(MBeanServer.class);
      private Bootstrap bootstrap;
      private EventLoopGroup worker;
      private int port;
      private String url;
      private int MAX_RETRY_TIMES = 10;

      public NettyClient(String url, int port) {
      this.url = url;
      this.port = port;
      bootstrap = new Bootstrap();
      worker = new NioEventLoopGroup();
      bootstrap.group(worker);
      bootstrap.channel(NioSocketChannel.class);
      }

      public void close() {
      logger.info("关闭资源");
      worker.shutdownGracefully();
      }

      public Object remoteCall(final MethodInvokeMeta cmd, int retry) {
      try {
      CustomChannelInitializerClient customChannelInitializer = new CustomChannelInitializerClient(cmd);
      bootstrap.handler(customChannelInitializer);
      ChannelFuture sync = bootstrap.connect(url, port).sync();
      sync.channel().closeFuture().sync();
      Object response = customChannelInitializer.getResponse();
      return response;
      } catch (InterruptedException e) {
      retry++;
      if (retry > MAX_RETRY_TIMES) {
      throw new RuntimeException("调用 Wrong");
      } else {
      try {
      Thread.sleep(100);
      } catch (InterruptedException e1) {
      e1.printStackTrace();
      }
      logger.info("第 {} 次尝试.... 失败", retry);
      return remoteCall(cmd, retry);
      }
      }
      }
      }

  2. ClientChannelHandlerAdapter.java

    Copy
    import com.edu.hart.rpc.entity.MethodInvokeMeta; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

    /**

    • Created by 叶云轩 on 2017/6/16-17:03

    • Concat tdg_yyx@foxmail.com
      */
      public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter {
      private Logger logger = LoggerFactory.getLogger(ClientChannelHandlerAdapter.class);
      private MethodInvokeMeta methodInvokeMeta;
      private CustomChannelInitializerClient channelInitializerClient;

      public ClientChannelHandlerAdapter(MethodInvokeMeta methodInvokeMeta, CustomChannelInitializerClient channelInitializerClient) {
      this.methodInvokeMeta = methodInvokeMeta;
      this.channelInitializerClient = channelInitializerClient;
      }

      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      logger.info("客户端出异常了, 异常信息:{}", cause.getMessage());
      cause.printStackTrace();
      ctx.close();
      }

      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
      if (methodInvokeMeta.getMethodName().endsWith("toString") && !"class java.lang.String".equals(methodInvokeMeta.getReturnType().toString()))
      logger.info("客户端发送信息参数:{}, 信息返回值类型:{}", methodInvokeMeta.getArgs(), methodInvokeMeta.getReturnType());
      ctx.writeAndFlush(methodInvokeMeta);

      }

      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      channelInitializerClient.setResponse(msg);
      }
      }

  3. CustomChannelInitializerClient.java

    Copy
    import com.edu.hart.rpc.entity.MethodInvokeMeta; import com.edu.hart.rpc.entity.NullWritable; import com.edu.hart.rpc.util.ObjectCodec; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

    /**

    • Created by 叶云轩 on 2017/6/16-15:01
    • Concat tdg_yyx@foxmail.com
      */
      public class CustomChannelInitializerClient extends ChannelInitializer {
Copy
private Logger logger = LoggerFactory.getLogger(CustomChannelInitializerClient.class);

private MethodInvokeMeta methodInvokeMeta;

private Object response;

public CustomChannelInitializerClient(MethodInvokeMeta methodInvokeMeta) {
if (!"toString".equals(methodInvokeMeta.getMethodName())) {
logger.info("[CustomChannelInitializerClient] 调用方法名:{},入参:{}, 参数类型:{},返回值类型 {}"
, methodInvokeMeta.getMethodName()
, methodInvokeMeta.getArgs()
, methodInvokeMeta.getParameterTypes()
, methodInvokeMeta.getReturnType());
}
this.methodInvokeMeta = methodInvokeMeta;
}

public Object getResponse() {
if (response instanceof NullWritable) {
return null;
}
return response;
}

public void setResponse(Object response) {
this.response = response;
}

@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2));
pipeline.addLast(new ObjectCodec());
pipeline.addLast(new ClientChannelHandlerAdapter(methodInvokeMeta, this));
}

}

Copy
4. RPCProxyFactoryBean.java

import com.edu.hart.rpc.entity.MethodInvokeMeta;
import com.edu.hart.rpc.util.WrapMethodUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.AbstractFactoryBean;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
* Created by 叶云轩 on 2017/6/16-17:16
* Concat tdg_yyx@foxmail.com
*/
public class RPCProxyFactoryBean extends AbstractFactoryBean implements InvocationHandler {
private Logger logger = LoggerFactory.getLogger(RPCProxyFactoryBean.class);

Copy
private Class interfaceClass;

private NettyClient nettyClient;

@Override
public Class<?> getObjectType() {
return interfaceClass;
}

@Override
protected Object createInstance() throws Exception {
logger.info("[代理工厂] 初始化代理 Bean : {}", interfaceClass);
return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) {
final MethodInvokeMeta methodInvokeMeta = WrapMethodUtils.readMethod(interfaceClass, method, args);
if (!methodInvokeMeta.getMethodName().equals("toString")) {
logger.info("[invoke] 调用接口 {}, 调用方法名:{},入参:{}, 参数类型:{},返回值类型 {}",
methodInvokeMeta.getInterfaceClass(), methodInvokeMeta.getMethodName()
, methodInvokeMeta.getArgs(), methodInvokeMeta.getParameterTypes(), methodInvokeMeta.getReturnType());
}
return nettyClient.remoteCall(methodInvokeMeta, 0);
}

public void setInterfaceClass(Class interfaceClass) {
this.interfaceClass = interfaceClass;
}

public void setNettyClient(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}

}

Copy
至此,netty-client与SpringBoot的集成了算完毕了。同样 ,在netty-client中也要加入相应的依赖

不过上面 server 与 client 使用了一些公共的类和工具。下面也给列举中出来。

#
##### MethodInvokeMeta.java

import org.springframework.stereotype.Component;

import java.io.Serializable;

/**
* 记录调用方法的元信息
* Created by 叶云轩 on 2017/6/7-15:41
* Concat tdg_yyx@foxmail.com
*/
@Component
public class MethodInvokeMeta implements Serializable {

Copy
private static final long serialVersionUID = 8379109667714148890L; // 接口 private Class<?> interfaceClass; // 方法名 private String methodName; // 参数 private Object[] args; // 返回值类型 private Class<?> returnType; // 参数类型 private Class<?>[] parameterTypes;

public Object[] getArgs() {
return args;
}

public void setArgs(Object[] args) {
this.args = args;
}

public Class<?> getInterfaceClass() {
return interfaceClass;
}

public void setInterfaceClass(Class<?> interfaceClass) {
this.interfaceClass = interfaceClass;
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

public Class[] getParameterTypes() {
return parameterTypes;
}

public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}

public Class getReturnType() {
return returnType;
}

public void setReturnType(Class returnType) {
this.returnType = returnType;
}

}

Copy
###### NullWritable.java

import java.io.Serializable;

/**
* 服务器可能返回空的处理
* Created by 叶云轩 on 2017/6/16-16:46
* Concat tdg_yyx@foxmail.com
*/
public class NullWritable implements Serializable {

Copy
private static final long serialVersionUID = -8191640400484155111L; private static NullWritable instance = new NullWritable();

private NullWritable() {
}

public static NullWritable nullWritable() {
return instance;
}

}

Copy
###### ObjectCodec.java

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;

import java.util.List;

public class ObjectCodec extends MessageToMessageCodec<ByteBuf, Object> {

Copy
@Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) { byte[] data = ObjectSerializerUtils.serilizer(msg); ByteBuf buf = Unpooled.buffer(); buf.writeBytes(data); out.add(buf); }

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes);
out.add(deSerilizer);
}

}

Copy
###### ObjectSerializerUtils.java

package com.edu.hart.rpc.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;

/**
* 对象序列化工具
*/
public class ObjectSerializerUtils {

Copy
private static final Logger logger = LoggerFactory.getLogger(ObjectSerializerUtils.class);

/**
* 反序列化
*
* @param data
* @return
*/

public static Object deSerilizer(byte[] data) {
if (data != null && data.length > 0) {
try {
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bis);
return ois.readObject();
} catch (Exception e) {
logger.info("[异常信息] {}", e.getMessage());
e.printStackTrace();
}
return null;
} else {
logger.info("[反序列化] 入参为空");
return null;
}
}

/**
* 序列化对象
*
* @param obj
* @return
*/

public static byte[] serilizer(Object obj) {
if (obj != null) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.flush();
oos.close();
return bos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null;
} else {
return null;
}
}

}

Copy
下面主要是用于Client端的: ###### NettyBeanSacnner.java

import com.edu.hart.rpc.client.RPCProxyFactoryBean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;

import java.util.List;

/**
* 动态加载代理 bean 到 Spring bean 工厂
*/
public class NettyBeanScanner implements BeanFactoryPostProcessor {

Copy
private DefaultListableBeanFactory beanFactory;

private String basePackage;

private String clientName;

public NettyBeanScanner(String basePackage, String clientName) {
this.basePackage = basePackage;
this.clientName = clientName;
}

/**
* 注册 Bean 到 Spring 的 bean 工厂
*/

public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
this.beanFactory = (DefaultListableBeanFactory) beanFactory;
// 加载远程服务的接口
List<String> resolverClass = PackageClassUtils.resolver(basePackage);
for (String clazz : resolverClass) {
String simpleName;
if (clazz.lastIndexOf('.') != -1) {
simpleName = clazz.substring(clazz.lastIndexOf('.') + 1);
} else {
simpleName = clazz;
}
BeanDefinitionBuilder gd = BeanDefinitionBuilder.genericBeanDefinition(RPCProxyFactoryBean.class);
gd.addPropertyValue("interfaceClass", clazz);
gd.addPropertyReference("nettyClient", clientName);
this.beanFactory.registerBeanDefinition(simpleName, gd.getRawBeanDefinition());
}
}

}

Copy
###### PackageClassUtils.java

这个类要说一下,主要是用来加载 Server 对应的接口的。因为在 Client 中 RPC 接口没有实现类,所以要自己将这些接口加载到 Spring 工厂里面。但是现在有个问题就是需要使用

#
##### SpringBoot 中 application.yml

basePackage: com.edu.hart.rpc.service.login;com.edu.hart.rpc.service.employee;com.edu.hart.rpc.service.authorization;

Copy
** 这样的方式来加载,使用通配符的时候会加载不到,这个问题我还没有解决。**

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
* 字节文件加载
*/
public class PackageClassUtils {

Copy
private final static Logger LOGGER = LoggerFactory.getLogger(PackageClassUtils.class);

/**
* 解析包参数
*
* @param basePackage 包名
* @return 包名字符串集合
/

public static List<String> resolver(String basePackage) {
// 以 ";" 分割开多个包名
String[] splitFHs = basePackage.split(";");
List<String> classStrs = new ArrayList<>();
//s: com.yyx.util.

for (String s : splitFHs) {
LOGGER.info("[加载类目录] {}", s);
// 路径中是否存在 "." com.yyx.util.
boolean contains = s.contains(".");
if (contains) {
// 截断星号 com.yyx.util
String filePathStr = s.substring(0, s.lastIndexOf(".
"));
// 组装路径 com/yyx/util
String filePath = filePathStr.replaceAll("\.", "/");
// 获取路径 xxx/classes/com/yyx/util
File file = new File(PackageClassUtils.class.getResource("/").getPath() + "/" + filePath);
// 获取目录下获取文件
getAllFile(filePathStr, file, classStrs);
} else {
String filePath = s.replaceAll("\.", "/");
File file = new File(PackageClassUtils.class.getResource("/").getPath() + "/" + filePath);
classStrs = getClassReferenceList(classStrs, file, s);
}
}
return classStrs;
}

/**
* 添加全限定类名到集合
*
* @param classStrs 集合
* @return 类名集合
*/

private static List<String> getClassReferenceList(List<String> classStrs, File file, String s) {
File[] listFiles = file.listFiles();
if (listFiles != null && listFiles.length != 0) {
for (File file2 : listFiles) {
if (file2.isFile()) {
String name = file2.getName();
String fileName = s + "." + name.substring(0, name.lastIndexOf('.'));
LOGGER.info("[加载完成] 类文件:{}", fileName);
classStrs.add(fileName);
}
}
}
return classStrs;
}

/**
* 获取一个目录下的所有文件
*
* @param s
* @param file
* @param classStrs
*/

private static void getAllFile(String s, File file, List<String> classStrs) {
if (file.isDirectory()) {
File[] files = file.listFiles();
if (files != null)
for (File file1 : files) {
getAllFile(s, file1, classStrs);
}
} else {
String path = file.getPath();
String cleanPath = path.replaceAll("/", ".");
String fileName = cleanPath.substring(cleanPath.indexOf(s), cleanPath.length());
LOGGER.info("[加载完成] 类文件:{}", fileName);
classStrs.add(fileName);
}
}

}

Copy
###### RemoteMethodInvokeUtil.java

import com.edu.hart.rpc.entity.MethodInvokeMeta;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* 消息处理类
* Created by 叶云轩 on 2017/6/7-15:49
* Concat tdg_yyx@foxmail.com
*/
public class RemoteMethodInvokeUtil implements ApplicationContextAware {

Copy
private ApplicationContext applicationContext;

public Object processMethod(MethodInvokeMeta methodInvokeMeta) throws InvocationTargetException, IllegalAccessException {
Class interfaceClass = methodInvokeMeta.getInterfaceClass();
Object bean = applicationContext.getBean(interfaceClass);
Method[] declaredMethods = interfaceClass.getDeclaredMethods();
Method method = null;
for (Method declaredMethod : declaredMethods) {
if (methodInvokeMeta.getMethodName().equals(declaredMethod.getName())) {
method = declaredMethod;
}
}
Object invoke = method.invoke(bean, methodInvokeMeta.getArgs());
return invoke;
}

@Override
public void setApplicationContext(ApplicationContext app) throws BeansException {
applicationContext = app;
}

}

Copy
###### WrapMethodUtils.java

import com.edu.hart.rpc.entity.MethodInvokeMeta;

import java.lang.reflect.Method;

public class WrapMethodUtils {
/**
* 获取 method 的元数据信息
*
* @param interfaceClass
* @param method
* @param args
* @return
*/
public static MethodInvokeMeta readMethod(Class interfaceClass, Method method, Object[] args) {
MethodInvokeMeta mim = new MethodInvokeMeta();
mim.setInterfaceClass(interfaceClass);
mim.setArgs(args);
mim.setMethodName(method.getName());
mim.setReturnType(method.getReturnType());
Class<?>[] parameterTypes = method.getParameterTypes();
mim.setParameterTypes(parameterTypes);
return mim;
}
}

Copy
------

下面的这些类我也会用在与前台通信时使用:

###### ResponseEnum.java

import java.io.Serializable;

/**

  • 响应码枚举类

  • Created by 叶云轩 on 2017/6/13-11:53

  • Concat tdg_yyx@foxmail.com
    */
    public enum ResponseCodeEnum implements Serializable {

    // region authentication code
    REQUEST_SUCCESS(10000, "请求成功"),
    SERVER_ERROR(99999, "服务器内部错误"),;

    //region 提供对外访问的方法, 无需更改
    /**

    • 响应码
      /
      private Integer code;
      /
      *
    • 响应信息
      */
      private String msg;

    ResponseCodeEnum(Integer code, String msg) {
    this.code = code;
    this.msg = msg;
    }

    public Integer getCode() {
    return code;
    }

    public String getMsg() {
    return msg;
    }

    //endregion
    }

Copy
###### ResponseResult.java

import java.io.Serializable;

/**

  • 数据返回实体封装

  • Created by 叶云轩 on 2017/6/13-11:38

  • Concat tdg_yyx@foxmail.com

  • @param 通用变量
    */
    public class ResponseResult implements Serializable {

    private static final long serialVersionUID = -3411174924856108156L;
    /**

    • 服务器响应码
      /
      private Integer code;
      /
      *
    • 服务器响应说明
      /
      private String msg;
      /
      *
    • 服务器响应数据
      */
      private T data;

    public ResponseResult() {

    }

    @Override
    public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    Copy
    ResponseResult<?> that = (ResponseResult<?>) o;

    return (code != null ? code.equals(that.code) : that.code == null) && (msg != null ? msg.equals(that.msg) : that.msg == null) && (data != null ? data.equals(that.data) : that.data == null);

    }

    public Integer getCode() {

    Copy
    return code;

    }

    public void setCode(Integer code) {
    this.code = code;
    }

    public T getData() {
    return data;
    }

    public void setData(T data) {
    this.data = data;
    }

    public String getMsg() {
    return msg;
    }

    public void setMsg(String msg) {
    this.msg = msg;
    }

    @Override
    public int hashCode() {
    int result = code != null ? code.hashCode() : 0;
    result = 31 * result
    + (msg != null ? msg.hashCode() : 0);
    result = 31 * result + (data != null ? data.hashCode() : 0);
    return result;
    }

    @Override
    public String toString() {
    return "ResponseResult{"
    + "code="
    + code
    + ", msg='"
    + msg
    + '''
    + ", data="
    + data
    + '}';
    }
    }

Copy
###### ResponseResultUtil.java

import com.edu.hart.modules.communicate.ResponseCodeEnum;
import com.edu.hart.modules.communicate.ResponseResult;

/**

  • 返回结果工具类

  • Created by 叶云轩 on 2017/5/29-10:37

  • Concat tdg_yyx@foxmail.com
    */
    public class ResponseResultUtil {

    /**

    • 请求失败返回的数据结构
    • @param responseCodeEnum 返回信息枚举类
    • @return 结果集
      */
      public static ResponseResult error(ResponseCodeEnum responseCodeEnum) {
      ResponseResult ResponseResult = new ResponseResult();
      ResponseResult.setMsg(responseCodeEnum.getMsg());
      ResponseResult.setCode(responseCodeEnum.getCode());
      ResponseResult.setData(null);
      return ResponseResult;
      }

    /**

    • 没有结果集的返回数据结构
    • @return 结果集
      */
      public static ResponseResult success() {
      return success(null);
      }

    /**

    • 成功返回数据结构
    • @param o 返回数据对象
    • @return 返回结果集
      */
      public static ResponseResult success(Object o) {
      ResponseResult responseResult = new ResponseResult();
      responseResult.setMsg(ResponseCodeEnum.REQUEST_SUCCESS.getMsg());
      responseResult.setCode(ResponseCodeEnum.REQUEST_SUCCESS.getCode());
      responseResult.setData(o);
      return responseResult;
      }

    /**

    • 判断是否成功
    • @param responseResult 请求结果
    • @return 判断结果
      */
      public static boolean judgementSuccess(ResponseResult responseResult) {
      return responseResult.getCode().equals(ResponseCodeEnum.REQUEST_SUCCESS.getCode());
      }
      }
Copy
来,我们测试一下远程通信:

1. Client调用Server的一个接口。可以看到在hart-oa项目中,RPCEmployeeService没有任何实现类,控制台中打印了方法的调用 以及入参信息

![Client 调用 Server 的一个接口](http://images2017.cnblogs.com/blog/1161505/201801/1161505-20180129131759375-896850881.png)

1. Server 断点监听到远程调用,CloudApplication 项目为 Server 端,我们可以看到接收到来自 hart-oa 的一个请求,参数一致。在 CloudApplication 中进行相应的处理后,返回到 Client(hart-oa)

![Server 断点监听到远程调用](http://images2017.cnblogs.com/blog/1161505/201801/1161505-20180129131759531-1793044895.png)

1. 返回信息到 Client,可以看到我们(hart-oa)收到了来自 CloudApplication 的响应,结果是我们封装好的 ResponseResult.

![返回信息到 Client](http://images2017.cnblogs.com/blog/1161505/201801/1161505-20180129131804968-429317475.png)


嗯 ~至此整合测试完成。要有想知道与 Spring 整合的,也可以留言或者邮箱:tdg_yyx@foxmail.com 给我~~~

若是想学习Netty的,对上面 代码看不懂的也可以留言或者邮箱:tdg_yyx@foxmail.com给我~~~