[Netty] Netty实现远程调用RPC功能

冻结中 · 发布于 2019-11-25 22:51 · 328 次阅读
142
添加依赖
    io.netty    netty-all    4.1.2.Final    org.reflections    reflections    0.9.10
组织架构

服务端
封装类信息
public class ClassInfo implements Serializable {    private static final long serialVersionUID = 1L;    private String className;  //类名    private String methodName;//方法名    private Class[] types; //参数类型    private Object[] objects;//参数列表    public String getClassName() {        return className;    }    public void setClassName(String className) {        this.className = className;    }    public String getMethodName() {        return methodName;    }    public void setMethodName(String methodName) {        this.methodName = methodName;    }    public Class[] getTypes() {        return types;    }    public void setTypes(Class[] types) {        this.types = types;    }    public Object[] getObjects() {        return objects;    }    public void setObjects(Object[] objects) {        this.objects = objects;    }}
服务端网络处理服务器
public class NettyRPCServer {    private int port;    public NettyRPCServer(int port) {        this.port = port;    }    public void start() {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 128)                    .childOption(ChannelOption.SO_KEEPALIVE, true)                    .localAddress(port).childHandler(                            new ChannelInitializer() {                                @Override                                protected void initChannel(SocketChannel ch) throws Exception {                                    ChannelPipeline pipeline = ch.pipeline();                                    //编码器                                    pipeline.addLast("encoder", new ObjectEncoder());                                    //解码器                                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));                                    //服务器端业务处理类                                    pipeline.addLast(new InvokeHandler());                                }                            });            ChannelFuture future = serverBootstrap.bind(port).sync();            System.out.println("......server is ready......");            future.channel().closeFuture().sync();        } catch (Exception e) {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        new NettyRPCServer(9999).start();    }}
服务器端业务处理类
public class InvokeHandler extends ChannelInboundHandlerAdapter {    //得到某接口下某个实现类的名字    private String getImplClassName(ClassInfo classInfo) throws Exception{        //服务方接口和实现类所在的包路径        String interfacePath="com.lyz.server";        int lastDot = classInfo.getClassName().lastIndexOf(".");        String interfaceName=classInfo.getClassName().substring(lastDot);        Class superClass=Class.forName(interfacePath+interfaceName);        Reflections reflections = new Reflections(interfacePath);        //得到某接口下的所有实现类        Set ImplClassSet=reflections.getSubTypesOf(superClass);        if(ImplClassSet.size()==0){            System.out.println("未找到实现类");            return null;        }else if(ImplClassSet.size()>1){            System.out.println("找到多个实现类,未明确使用哪一个");            return null;        }else {            //把集合转换为数组            Class[] classes=ImplClassSet.toArray(new Class[0]);            return classes[0].getName(); //得到实现类的名字        }    }    @Override  //读取客户端发来的数据并通过反射调用实现类的方法    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ClassInfo classInfo = (ClassInfo) msg;        System.out.println(classInfo);        Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();        Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());        //通过反射调用实现类的方法        Object result = method.invoke(clazz, classInfo.getObjects());        ctx.writeAndFlush(result);    }}
服务端接口及实现类
// 无参接口public interface HelloNetty {    String hello();}// 实现类public class HelloNettyImpl implements HelloNetty {    @Override    public String hello() {        return "hello,netty";    }}// 带参接口public interface HelloRPC {    String hello(String name);}// 实现类public class HelloRPCImpl implements HelloRPC {    @Override    public String hello(String name) {        return "hello," + name;    }}
客户端
代理类
public class NettyRPCProxy {    //根据接口创建代理对象    public static Object create(Class target) {        return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {            @Override            public Object invoke(Object proxy, Method method, Object[] args)                    throws Throwable {                //封装ClassInfo                ClassInfo classInfo = new ClassInfo();                classInfo.setClassName(target.getName());                classInfo.setMethodName(method.getName());                classInfo.setObjects(args);                classInfo.setTypes(method.getParameterTypes());                //开始用Netty发送数据                EventLoopGroup group = new NioEventLoopGroup();                ResultHandler resultHandler = new ResultHandler();                try {                    Bootstrap b = new Bootstrap();                    b.group(group)                            .channel(NioSocketChannel.class)                            .handler(new ChannelInitializer() {                                @Override                                public void initChannel(SocketChannel ch) throws Exception {                                    ChannelPipeline pipeline = ch.pipeline();                                    //编码器                                    pipeline.addLast("encoder", new ObjectEncoder());                                    //解码器  构造方法第一个参数设置二进制数据的最大字节数  第二个参数设置具体使用哪个类解析器                                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));                                    //客户端业务处理类                                    pipeline.addLast("handler", resultHandler);                                }                            });                    ChannelFuture future = b.connect("127.0.0.1", 9999).sync();                    future.channel().writeAndFlush(classInfo).sync();                    future.channel().closeFuture().sync();                } finally {                    group.shutdownGracefully();                }                return resultHandler.getResponse();            }        });    }}
客户端业务处理类
public class ResultHandler extends ChannelInboundHandlerAdapter {    private Object response;    public Object getResponse() {        return response;    }    @Override //读取服务器端返回的数据(远程调用的结果)    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        response = msg;        ctx.close();    }}
客户端接口
// 无参接口public interface HelloNetty {    String hello();}// 带参接口public interface HelloRPC {    String hello(String name);}
测试类 服务调用方
public class TestNettyRPC {    public static void main(String [] args){        //第1次远程调用        HelloNetty helloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class);        System.out.println(helloNetty.hello());        //第2次远程调用        HelloRPC helloRPC =  (HelloRPC) NettyRPCProxy.create(HelloRPC.class);        System.out.println(helloRPC.hello("RPC"));    }}
输出结果
服务端
......server is ready......com.lyz.serverStub.ClassInfo@2b894733com.lyz.serverStub.ClassInfo@167bfa9
客户端
hello,nettyhello,RPC
下一篇通过netty实现线上聊天功能

               
来源:https://www.cnblogs.com/lyze/p/11803073.html
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
共收到 0 条回复
暂无回复。
回帖
B Color Image Link Quote Code Smilies
Command + Enter
快速回复 返回顶部 返回列表