`
zy19982004
  • 浏览: 654243 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
F6f66edc-1c1a-3859-b76b-a22e740b7aa7
Hadoop学习
浏览量:249827
社区版块
存档分类
最新评论

Hadoop学习十:Hadoop-Hdfs RPC源码 RPC

 
阅读更多

一.RPC类图

二.详细描述

  1. Server:继承org.apache.hadoop.ipc.Server(Hadoop学习九:Hadoop-hdfs RPC源码 Server)。我们称之为RPC Server。
      /** An RPC Server. */
      public static class Server extends org.apache.hadoop.ipc.Server {
        //创建一个RPC server
        //注意调用父类IP.Server的构造函数时传入的就是Invocation.class
        public Server(Object instance, Configuration conf, String bindAddress,  int port,
                      int numHandlers, boolean verbose, 
                      SecretManager<? extends TokenIdentifier> secretManager) 
            throws IOException {
          super(bindAddress, port, Invocation.class, numHandlers, conf,
              classNameBase(instance.getClass().getName()), secretManager);
        }
    
        //重写ipc.Server的call方法
        public Writable call(Class<?> protocol, Writable param, long receivedTime){
            Invocation call = (Invocation)param;
            if (verbose) log("Call: " + call);
    
            Method method = protocol.getMethod(call.getMethodName(),
            method.setAccessible(true);
            long startTime = System.currentTimeMillis();
            //真正执行远程命令就体现在这里:client的方法最终在server上被执行,就是所谓的rpc
            Object value = method.invoke(instance, call.getParameters());
    
            return new ObjectWritable(method.getReturnType(), value);
        }
      }
  2. Invocation: 我要在远程server上运行一条命令,也就是一个方法,我们把这个方法的方法名,参数,类型封装成一个Invocation对象。
       //每次方法调用都会实例化一个Invocation对象
       //将方法的方法名,参数类型,值封装成一个Invocation对象
      private static class Invocation implements Writable, Configurable {
        private String methodName;
        private Class[] parameterClasses;
        private Object[] parameters;
        private Configuration conf;
    
        public Invocation(Method method, Object[] parameters) {
          this.methodName = method.getName();
          this.parameterClasses = method.getParameterTypes();
          this.parameters = parameters;
        }
      }
  3. Invoker: 继承InvocationHandler,重写了invoke方法,invoke方法里面就是IPC Client向IPC Server发送Invocation。
      //java反射
      private static class Invoker implements InvocationHandler {
        private Client.ConnectionId remoteId;
        private Client client;
        private boolean isClosed = false;
        //只有内部RPC.getProx时调用
        private Invoker(Class<? extends VersionedProtocol> protocol,
            InetSocketAddress address, UserGroupInformation ticket,
            Configuration conf, SocketFactory factory,
            int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
          this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
              ticket, rpcTimeout, connectionRetryPolicy, conf);
          this.client = CLIENTS.getClient(conf, factory);
        }
    
        //重写invoke方法
        //每次方法调用都将封装成Invocation对象,被发往server端
        //返回server端运行此方法的结果
        public Object invoke(Object proxy, Method method, Object[] args)
          throws Throwable {
          ObjectWritable value = (ObjectWritable)
            client.call(new Invocation(method, args), remoteId);//client向server发生消息
          if (logDebug) {
            long callTime = System.currentTimeMillis() - startTime;
            LOG.debug("Call: " + method.getName() + " " + callTime);
          }
          return value.get();
        }
      }
  4. ClientCache:缓存IPC Client对象。
  5. RPC:前两章学习了IPC Client和IPC Server,RPC就是综合了这两者以及java反射机制,为我们提供了许多静态方法,我们只需要调用RPC.*,即可获得代理类,而不需要关心它们是怎么完成的。
    public class RPC {
      private RPC() {}                                  // no public ctor
      
      //相比getProxy,waitForProxy肯定能获得一个代理类VersionedProtocol
      static VersionedProtocol waitForProxy(Class<? extends VersionedProtocol> protocol,
                                                   long clientVersion,
                                                   InetSocketAddress addr,
                                                   Configuration conf,
                                                   int rpcTimeout,
                                                   long connTimeout)
                                                   throws IOException { 
            return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
            catch(ConnectException se) {  // namenode has not been started
            LOG.info("Server at " + addr + " not available yet, Zzzzz..."); //你是在卖萌
        }
      }
    
      //获得代理类,剩下的只需用代理类执行命令就行了
      public static VersionedProtocol getProxy(
          Class<? extends VersionedProtocol> protocol,
          long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
          Configuration conf, SocketFactory factory, int rpcTimeout,
          RetryPolicy connectionRetryPolicy) throws IOException {
    
        final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
            rpcTimeout, connectionRetryPolicy);
        //反射获得代理类
        VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[]{protocol}, invoker);
        long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
        //协议检查
        if (serverVersion == clientVersion) {
          return proxy;
        }
      }
    
      //每次发送一组方法调用到指定server
      //并没有经过反射机制
      //而是直接发送方法名,参数类型,值到指定server;并获取server执行这些方法的结果集
      //如果你确实有一组远程命令要执行,并且知道这些命令的的方法名,参数类型,值,并为每个命令指定执行server,那你可以调用此方法
      public static Object[] call(Method method, Object[][] params,
                                  InetSocketAddress[] addrs, 
                                  UserGroupInformation ticket, Configuration conf)
        throws IOException, InterruptedException {
        //实例化Invocation
        Invocation[] invocations = new Invocation[params.length];
        for (int i = 0; i < params.length; i++)
          invocations[i] = new Invocation(method, params[i]);
        Client client = CLIENTS.getClient(conf);
        try {//发生Invocation
        Writable[] wrappedValues = 
          client.call(invocations, addrs, method.getDeclaringClass(), ticket, conf);
        
        if (method.getReturnType() == Void.TYPE) {
          return null;
        }
    
        Object[] values =
          (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
        for (int i = 0; i < values.length; i++)
          if (wrappedValues[i] != null)
            values[i] = ((ObjectWritable)wrappedValues[i]).get();
        
        return values;
        } finally {
          CLIENTS.stopClient(client);
        }
      }
     
      //获得RPC Server
      public static Server getServer(final Object instance, final String bindAddress, final int port,
                                     final int numHandlers,
                                     final boolean verbose, Configuration conf,
                                     SecretManager<? extends TokenIdentifier> secretManager) 
        throws IOException {
        return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
      }
      
    }
     

 

0
0
分享到:
评论

相关推荐

    hadoop段海涛老师八天实战视频

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    Hadoop 培训课程(2)HDFS

    Hadoop 培训课程(2)HDFS 分布式文件系统与HDFS HDFS体系结构与基本概念*** HDFS的shell操作*** java接口及常用api*** ---------------------------加深拓展---------------------- RPC调用** HDFS的分布式存储架构的...

    Hadoop 2.X HDFS源码剖析

    《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...

    HDFS源码剖析带书签目录高清.zip

    《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...

    hdfs源码.zip

    1.2.1 Hadoop RPC接口 4 1.2.2 流式接口 20 1.3 HDFS主要流程 22 1.3.1 HDFS客户端读流程 22 1.3.2 HDFS客户端写流程 24 1.3.3 HDFS客户端追加写流程 25 1.3.4 Datanode启动、心跳以及执行名字节点指令...

    学习hadoop_源代码,RPC_部分

    hadoop rpc 云计算 hdfs mapreduce

    hadoop入门java代码hdfs文件操作 wordCount源码

    hadoop入门级的代码 Java编写 eclipse可运行 包含 hdfs的文件操作 rpc远程调用的简单示例 map-reduce的几个例子:wordcount 学生平均成绩 手机流量统计

    妳那伊抹微笑_云计算之Hadoop完美笔记2.0

    NameNode源码分析(RPC是基础) DataNode源码分析 FileSystem源码分析(如何与NameNode通信ClientProtocol) JobTracker源码分析 TaskTracker源码分析 网站日志分析项目(这个项目分析可以让你更加掌握好所学的知识...

    大纲及下载地址.doc

    02 - 全网最全的Hadoop集群搭建视频 03 - 深度揭秘世界级分布式文件系统 HDFS 架构设计 04 - 老司机带你自研RPC 05 - 老司机带你自研分布式文件系统 06 - 老司机带你自研分布式计算引擎 07 - Hive底层执行引擎深度...

Global site tag (gtag.js) - Google Analytics