一.RPC类图
二.详细描述
- Server:继承org.apache.hadoop.ipc.Server(Hadoop学习九:Hadoop-hdfs RPC源码 Server)。我们称之为RPC Server。
/** An RPC Server. */ public static class Server extends org.apache.hadoop.ipc.Server { //创建一个RPC server //注意调用父类IP.Server的构造函数时传入的就是Invocation.class public Server(Object instance, Configuration conf, String bindAddress, int port, int numHandlers, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()), secretManager); } //重写ipc.Server的call方法 public Writable call(Class<?> protocol, Writable param, long receivedTime){ Invocation call = (Invocation)param; if (verbose) log("Call: " + call); Method method = protocol.getMethod(call.getMethodName(), method.setAccessible(true); long startTime = System.currentTimeMillis(); //真正执行远程命令就体现在这里:client的方法最终在server上被执行,就是所谓的rpc Object value = method.invoke(instance, call.getParameters()); return new ObjectWritable(method.getReturnType(), value); } }
- Invocation: 我要在远程server上运行一条命令,也就是一个方法,我们把这个方法的方法名,参数,类型封装成一个Invocation对象。
//每次方法调用都会实例化一个Invocation对象 //将方法的方法名,参数类型,值封装成一个Invocation对象 private static class Invocation implements Writable, Configurable { private String methodName; private Class[] parameterClasses; private Object[] parameters; private Configuration conf; public Invocation(Method method, Object[] parameters) { this.methodName = method.getName(); this.parameterClasses = method.getParameterTypes(); this.parameters = parameters; } }
- Invoker: 继承InvocationHandler,重写了invoke方法,invoke方法里面就是IPC Client向IPC Server发送Invocation。
//java反射 private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; //只有内部RPC.getProx时调用 private Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf); this.client = CLIENTS.getClient(conf, factory); } //重写invoke方法 //每次方法调用都将封装成Invocation对象,被发往server端 //返回server端运行此方法的结果 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);//client向server发生消息 if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } }
- ClientCache:缓存IPC Client对象。
- RPC:前两章学习了IPC Client和IPC Server,RPC就是综合了这两者以及java反射机制,为我们提供了许多静态方法,我们只需要调用RPC.*,即可获得代理类,而不需要关心它们是怎么完成的。
public class RPC { private RPC() {} // no public ctor //相比getProxy,waitForProxy肯定能获得一个代理类VersionedProtocol static VersionedProtocol waitForProxy(Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout, long connTimeout) throws IOException { return getProxy(protocol, clientVersion, addr, conf, rpcTimeout); catch(ConnectException se) { // namenode has not been started LOG.info("Server at " + addr + " not available yet, Zzzzz..."); //你是在卖萌 } } //获得代理类,剩下的只需用代理类执行命令就行了 public static VersionedProtocol getProxy( Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy); //反射获得代理类 VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker); long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion); //协议检查 if (serverVersion == clientVersion) { return proxy; } } //每次发送一组方法调用到指定server //并没有经过反射机制 //而是直接发送方法名,参数类型,值到指定server;并获取server执行这些方法的结果集 //如果你确实有一组远程命令要执行,并且知道这些命令的的方法名,参数类型,值,并为每个命令指定执行server,那你可以调用此方法 public static Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) throws IOException, InterruptedException { //实例化Invocation Invocation[] invocations = new Invocation[params.length]; for (int i = 0; i < params.length; i++) invocations[i] = new Invocation(method, params[i]); Client client = CLIENTS.getClient(conf); try {//发生Invocation Writable[] wrappedValues = client.call(invocations, addrs, method.getDeclaringClass(), ticket, conf); if (method.getReturnType() == Void.TYPE) { return null; } Object[] values = (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); for (int i = 0; i < values.length; i++) if (wrappedValues[i] != null) values[i] = ((ObjectWritable)wrappedValues[i]).get(); return values; } finally { CLIENTS.stopClient(client); } } //获得RPC Server public static Server getServer(final Object instance, final String bindAddress, final int port, final int numHandlers, final boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager); } }
相关推荐
第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...
第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...
Hadoop 培训课程(2)HDFS 分布式文件系统与HDFS HDFS体系结构与基本概念*** HDFS的shell操作*** java接口及常用api*** ---------------------------加深拓展---------------------- RPC调用** HDFS的分布式存储架构的...
《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...
《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...
1.2.1 Hadoop RPC接口 4 1.2.2 流式接口 20 1.3 HDFS主要流程 22 1.3.1 HDFS客户端读流程 22 1.3.2 HDFS客户端写流程 24 1.3.3 HDFS客户端追加写流程 25 1.3.4 Datanode启动、心跳以及执行名字节点指令...
hadoop rpc 云计算 hdfs mapreduce
hadoop入门级的代码 Java编写 eclipse可运行 包含 hdfs的文件操作 rpc远程调用的简单示例 map-reduce的几个例子:wordcount 学生平均成绩 手机流量统计
NameNode源码分析(RPC是基础) DataNode源码分析 FileSystem源码分析(如何与NameNode通信ClientProtocol) JobTracker源码分析 TaskTracker源码分析 网站日志分析项目(这个项目分析可以让你更加掌握好所学的知识...
02 - 全网最全的Hadoop集群搭建视频 03 - 深度揭秘世界级分布式文件系统 HDFS 架构设计 04 - 老司机带你自研RPC 05 - 老司机带你自研分布式文件系统 06 - 老司机带你自研分布式计算引擎 07 - Hive底层执行引擎深度...