加入收藏 | 设为首页 | 会员中心 | 我要投稿 台州站长网 (https://www.0576zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 编程要点 > 语言 > 正文

Java从零开展手写RPC—Reflect反射实现通用调用之服务端

发布时间:2021-11-04 10:06:32 所属栏目:语言 来源:互联网
导读:前面我们的例子是一个固定的出参和入参,固定的方法实现。 本节将实现通用的调用,让框架具有更广泛的实用性。 基本思路 所有的方法调用,基于反射进行相关处理实现。 java 从零开始手写 RPC (05) reflect 反射实现通用调用之服务端 服务端 核心类 RpcServer
    前面我们的例子是一个固定的出参和入参,固定的方法实现。   本节将实现通用的调用,让框架具有更广泛的实用性。   基本思路 所有的方法调用,基于反射进行相关处理实现。   java 从零开始手写 RPC (05) reflect 反射实现通用调用之服务端 服务端 核心类 RpcServer 调整如下:   serverBootstrap.group(workerGroup, bossGroup)      .channel(NioServerSocketChannel.class)      // 打印日志      .handler(new LoggingHandler(LogLevel.INFO))      .childHandler(new ChannelInitializer<Channel>() {          @Override          protected void initChannel(Channel ch) throws Exception {              ch.pipeline()              // 解码 bytes=>resp              .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))               // request=>bytes               .addLast(new ObjectEncoder())               .addLast(new RpcServerHandler());          }      })      // 这个参数影响的是还没有被accept 取出的连接      .option(ChannelOption.SO_BACKLOG, 128)      // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。      .childOption(ChannelOption.SO_KEEPALIVE, true);  其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。   RpcServerHandler package com.github.houbb.rpc.server.handler;      import com.github.houbb.log.integration.core.Log;  import com.github.houbb.log.integration.core.LogFactory;  import com.github.houbb.rpc.common.rpc.domain.RpcRequest;  import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;  import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;  import io.netty.channel.ChannelHandlerContext;  import io.netty.channel.SimpleChannelInboundHandler;      /**   * @author binbin.hou   * @since 0.0.1   */  public class RpcServerHandler extends SimpleChannelInboundHandler {          private static final Log log = LogFactory.getLog(RpcServerHandler.class);          @Override      public void channelActive(ChannelHandlerContext ctx) throws Exception {          final String id = ctx.channel().id().asLongText();          log.info("[Server] channel {} connected " + id);      }          @Override      protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {          final String id = ctx.channel().id().asLongText();          log.info("[Server] channel read start: {}", id);              // 接受客户端请求          RpcRequest rpcRequest = (RpcRequest)msg;          log.info("[Server] receive channel {} request: {}", id, rpcRequest);              // 回写到 client 端          DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest);          ctx.writeAndFlush(rpcResponse);          log.info("[Server] channel {} response {}", id, rpcResponse);      }          @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {          cause.printStackTrace();          ctx.close();      }          /**       * 处理请求信息       * @param rpcRequest 请求信息       * @return 结果信息       * @since 0.0.6       */      private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) {          DefaultRpcResponse rpcResponse = new DefaultRpcResponse();          rpcResponse.seqId(rpcRequest.seqId());              try {              // 获取对应的 service 实现类              // rpcRequest=>invocationRequest              // 执行 invoke              Object result = DefaultServiceFactory.getInstance()                      .invoke(rpcRequest.serviceId(),                              rpcRequest.methodName(),                              rpcRequest.paramTypeNames(),                              rpcRequest.paramValues());              rpcResponse.result(result);          } catch (Exception e) {              rpcResponse.error(e);              log.error("[Server] execute meet ex for request", rpcRequest, e);          }              // 构建结果值          return rpcResponse;      }      }  和以前类似,不过 handleRpcRequest 要稍微麻烦一点。   这里需要根据发射,调用对应的方法。   pojo 其中使用的出参、入参实现如下:   RpcRequest package com.github.houbb.rpc.common.rpc.domain;      import java.util.List;      /**   * 序列化相关处理   * (1)调用创建时间-createTime   * (2)调用方式 callType   * (3)超时时间 timeOut   *   * 额外信息:   * (1)上下文信息   *   * @author binbin.hou   * @since 0.0.6   */  public interface RpcRequest extends BaseRpc {          /**       * 创建时间       * @return 创建时间       * @since 0.0.6       */      long createTime();          /**       * 服务唯一标识       * @return 服务唯一标识       * @since 0.0.6       */      String serviceId();          /**       * 方法名称       * @return 方法名称       * @since 0.0.6       */      String methodName();          /**       * 方法类型名称列表       * @return 名称列表       * @since 0.0.6       */      List<String> paramTypeNames();          // 调用参数信息列表          /**       * 调用参数值       * @return 参数值数组       * @since 0.0.6       */      Object[] paramValues();      }  RpcResponse package com.github.houbb.rpc.common.rpc.domain;      /**   * 序列化相关处理   * @author binbin.hou   * @since 0.0.6   */  public interface RpcResponse extends BaseRpc {          /**       * 异常信息       * @return 异常信息       * @since 0.0.6       */      Throwable error();          /**       * 请求结果       * @return 请求结果       * @since 0.0.6       */      Object result();      }  BaseRpc package com.github.houbb.rpc.common.rpc.domain;      import java.io.Serializable;      /**   * 序列化相关处理   * @author binbin.hou   * @since 0.0.6   */  public interface BaseRpc extends Serializable {          /**       * 获取唯一标识号       * (1)用来唯一标识一次调用,便于获取该调用对应的响应信息。       * @return 唯一标识号       */      String seqId();          /**       * 设置唯一标识号       * @param traceId 唯一标识号       * @return this       */      BaseRpc seqId(final String traceId);      }  ServiceFactory-服务工厂 为了便于对所有的 service 实现类统一管理,这里定义 service 工厂类。   ServiceFactory package com.github.houbb.rpc.server.service;      import com.github.houbb.rpc.server.config.service.ServiceConfig;  import com.github.houbb.rpc.server.registry.ServiceRegistry;      import java.util.List;      /**   * 服务方法类仓库管理类-接口   *   *   * (1)对外暴露的方法,应该尽可能的少。   * (2)对于外部的调用,后期比如 telnet 治理,可以使用比如有哪些服务列表?   * 单个服务有哪些方法名称?   *   * 等等基础信息的查询,本期暂时全部隐藏掉。   *   * (3)前期尽可能的少暴露方法。   * @author binbin.hou   * @since 0.0.6   * @see ServiceRegistry 服务注册,将服务信息放在这个类中,进行统一的管理。   * @see ServiceMethod 方法信息   */  public interface ServiceFactory {          /**       * 注册服务列表信息       * @param serviceConfigList 服务配置列表       * @return this       * @since 0.0.6       */      ServiceFactory registerServices(final List<ServiceConfig> serviceConfigList);          /**       * 直接反射调用       * (1)此处对于方法反射,为了提升性能,所有的 class.getFullName() 进行拼接然后放进 key 中。       *       * @param serviceId 服务名称       * @param methodName 方法名称       * @param paramTypeNames 参数类型名称列表       * @param paramValues 参数值       * @return 方法调用返回值       * @since 0.0.6       */      Object invoke(final String serviceId, final String methodName,                    List<String> paramTypeNames, final Object[] paramValues);      }  DefaultServiceFactory 作为默认实现,如下:   package com.github.houbb.rpc.server.service.impl;      import com.github.houbb.heaven.constant.PunctuationConst;  import com.github.houbb.heaven.util.common.ArgUtil;  import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;  import com.github.houbb.heaven.util.util.CollectionUtil;  import com.github.houbb.rpc.common.exception.RpcRuntimeException;  import com.github.houbb.rpc.server.config.service.ServiceConfig;  import com.github.houbb.rpc.server.service.ServiceFactory;      import java.lang.reflect.InvocationTargetException;  import java.lang.reflect.Method;  import java.util.HashMap;  import java.util.List;  import java.util.Map;      /**   * 默认服务仓库实现   * @author binbin.hou   * @since 0.0.6   */  public class DefaultServiceFactory implements ServiceFactory {          /**       * 服务 map       * @since 0.0.6       */      private Map<String, Object> serviceMap;          /**       * 直接获取对应的 method 信息       * (1)key: serviceId:methodName:param1@param2@param3       * (2)value: 对应的 method 信息       */      private Map<String, Method> methodMap;          private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory();          private DefaultServiceFactory(){}          public static DefaultServiceFactory getInstance() {          return INSTANCE;      }          /**       * 服务注册一般在项目启动的时候,进行处理。       * 属于比较重的操作,而且一个服务按理说只应该初始化一次。       * 此处加锁为了保证线程安全。       * @param serviceConfigList 服务配置列表       * @return this       */      @Override      public synchronized ServiceFactory registerServices(List<ServiceConfig> serviceConfigList) {          ArgUtil.notEmpty(serviceConfigList, "serviceConfigList");              // 集合初始化          serviceMap = new HashMap<>(serviceConfigList.size());          // 这里只是预估,一般为2个服务。          methodMap = new HashMap<>(serviceConfigList.size()*2);              for(ServiceConfig serviceConfig : serviceConfigList) {              serviceMap.put(serviceConfig.id(), serviceConfig.reference());          }              // 存放方法名称          for(Map.Entry<String, Object> entry : serviceMap.entrySet()) {              String serviceId = entry.getKey();              Object reference = entry.getValue();                  //获取所有方法列表              Method[] methods = reference.getClass().getMethods();              for(Method method : methods) {                  String methodName = method.getName();                  if(ReflectMethodUtil.isIgnoreMethod(methodName)) {                      continue;                  }                      List<String> paramTypeNames = ReflectMethodUtil.getParamTypeNames(method);                  String key = buildMethodKey(serviceId, methodName, paramTypeNames);                  methodMap.put(key, method);              }          }              return this;      }              @Override      public Object invoke(String serviceId, String methodName, List<String> paramTypeNames, Object[] paramValues) {          //参数校验          ArgUtil.notEmpty(serviceId, "serviceId");          ArgUtil.notEmpty(methodName, "methodName");              // 提供 cache,可以根据前三个值快速定位对应的 method          // 根据 method 进行反射处理。          // 对于 paramTypes 进行 string 连接处理。          final Object reference = serviceMap.get(serviceId);          final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames);          final Method method = methodMap.get(methodKey);              try {              return method.invoke(reference, paramValues);          } catch (IllegalAccessException | InvocationTargetException e) {              throw new RpcRuntimeException(e);          }      }          /**       * (1)多个之间才用 : 分隔       * (2)参数之间采用 @ 分隔       * @param serviceId 服务标识       * @param methodName 方法名称       * @param paramTypeNames 参数类型名称       * @return 构建完整的 key       * @since 0.0.6       */      private String buildMethodKey(String serviceId, String methodName, List<String> paramTypeNames) {          String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT);          return serviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON                  +param;      }      }  ServiceRegistry-服务注册类 接口 package com.github.houbb.rpc.server.registry;      /**   * 服务注册类   * (1)每个应用唯一   * (2)每个服务的暴露协议应该保持一致   * 暂时不提供单个服务的特殊处理,后期可以考虑添加   *   * @author binbin.hou   * @since 0.0.6   */  public interface ServiceRegistry {          /**       * 暴露的 rpc 服务端口信息       * @param port 端口信息       * @return this       * @since 0.0.6       */      ServiceRegistry port(final int port);          /**       * 注册服务实现       * @param serviceId 服务标识       * @param serviceImpl 服务实现       * @return this       * @since 0.0.6       */      ServiceRegistry register(final String serviceId, final Object serviceImpl);          /**       * 暴露所有服务信息       * (1)启动服务端       * @return this       * @since 0.0.6       */      ServiceRegistry expose();      }

(编辑:台州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读