博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop RPC源码阅读-客户端
阅读量:4499 次
发布时间:2019-06-08

本文共 13078 字,大约阅读时间需要 43 分钟。

Hadoop版本Hadoop2.6

RPC主要分为3个部分:(2)客户端

(2)客户端

先展示RPC客户端实例代码

public class LoginController {    public static void main(String[] args) throws IOException {      //获取RPC LoginServiceInterface协议接口的代理对象        LoginServiceInterface proxy= RPC.getProxy(LoginServiceInterface.class,1L,new InetSocketAddress("localhost",10000),new Configuration());        String msg=proxy.login("xiaoming","123123");        System.out.println(msg);    }}

(1)进入上述的RPC.getProxy方法,会发现是通过获取RpcEngine接口(默认实现是WritableRpcEngine),利用WritableRpcEngine的getProxy方法获取Proxy代理,如下所示

public 
ProtocolProxy
getProxy(Class
protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException { if (connectionRetryPolicy != null) { throw new UnsupportedOperationException( "Not supported: connectionRetryPolicy=" + connectionRetryPolicy); } T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, fallbackToSimpleAuth)); return new ProtocolProxy
(protocol, proxy, true); }

(2)上述就是客户端获取代理的过程,但是其中是如何从服务端获取通过动态代理类Invoker实现,并将代理封装成ProtocolProxy类,在本文上述的例子中,该ProtocolProxy类没有干什么,只是通过getProxy()方法将封装的代理返回给客户端

那么我们接着分析动态代理类Invoker

Invoker成员有Clinet类,并且全局变量ClientCache对Client进行缓存。

动态代理类Invoker在代理对象发送请求时会自动执行invoke()方法,如下所示:

public Object invoke(Object proxy, Method method, Object[] args)      throws Throwable {      long startTime = 0;      if (LOG.isDebugEnabled()) {        startTime = Time.now();      }      TraceScope traceScope = null;      if (Trace.isTracing()) {        traceScope = Trace.startSpan(            method.getDeclaringClass().getCanonicalName() +            "." + method.getName());      }      ObjectWritable value;      try {        value = (ObjectWritable)          client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),            remoteId, fallbackToSimpleAuth);      } finally {        if (traceScope != null) traceScope.close();      }      if (LOG.isDebugEnabled()) {        long callTime = Time.now() - startTime;        LOG.debug("Call: " + method.getName() + " " + callTime);      }      return value.get();    }

3、上述中动态代理通过client.call方法向服务器发送请求获取返回值。

我们还看到Invocation类封装了方法和参数,Invocation通过实现Writable实现序列化,方便数据在网络中传输,作为数据传输层,相当于VO。

因此我们接着进入Clinet类,查看call方法干了什么。

首先我们先看看Client类的结构,Client类包含了几个内部类:

Call :用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据

Connection :用以处理远程连接对象。继承了Thread
ConnectionId :唯一确定一个连接

Client类中call()方法如下所示:

public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,      ConnectionId remoteId, int serviceClass,      AtomicBoolean fallbackToSimpleAuth) throws IOException {    final Call call = createCall(rpcKind, rpcRequest);//将传入的数据封装成call对象    Connection connection = getConnection(remoteId, call, serviceClass,      fallbackToSimpleAuth);//获得一个连接      try {      connection.sendRpcRequest(call);                 // send the rpc request向服务端发送call对象    } catch (RejectedExecutionException e) {      throw new IOException("connection has been closed", e);    } catch (InterruptedException e) {      Thread.currentThread().interrupt();      LOG.warn("interrupted waiting to send rpc request to server", e);      throw new IOException(e);    }    boolean interrupted = false;    synchronized (call) {      while (!call.done) {        try {          call.wait();                           // wait for the result        } catch (InterruptedException ie) {          // save the fact that we were interrupted          interrupted = true;        }      }      if (interrupted) {        // set the interrupt flag now that we are done waiting        Thread.currentThread().interrupt();      }      if (call.error != null) {        if (call.error instanceof RemoteException) {          call.error.fillInStackTrace();          throw call.error;        } else { // local exception          InetSocketAddress address = connection.getRemoteAddress();          throw NetUtils.wrapException(address.getHostName(),                  address.getPort(),                  NetUtils.getHostname(),                  0,                  call.error);        }      } else {        return call.getRpcResponse();      }    }  }

 4、从上述可以看到,rpcRequest是将方法和参数封装后的可序列号的对象,当做请求参数发送给服务端。

在上述方法中主要使用了两个类Call和Connection.

Call:封装了与服务端请求的状态,包括:

final int id;               // call id该请求连接ID    final int retry;           // retry count该请求重试次数    final Writable rpcRequest;  // the serialized rpc request该请求参数    Writable rpcResponse;       // null if rpc has error该请求的返回值    IOException error;          // exception, null if success该请求成功标示    final RPC.RpcKind rpcKind;      // Rpc EngineKind使用RpcEngine的类型    boolean done;               // true when call is done该请求完成标示

Connection则是实现了与服务端建立连接,发送请求,获取数据等功能。

5、Connection类解析

Connection类继承线程类Thread.

 从3步可以看到在Clinet的call()方法通过getConnection()方法获取Connection,如下所示:

可以看出Client使用connections对客户端每一个connection进行缓存,

并通过setupIOstreams()方法与服务器建立Socket连接,并创建输入输出流connection.in,connection.out,

并通过start()方法启动该线程也就是运行Connection类的run()方法,等待服务端传回数据。

因此Connection类主要通过run()方法接受数据,通过sendRpcRequest()向服务端发送请求。

private Connection getConnection(ConnectionId remoteId,      Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)      throws IOException {    if (!running.get()) {      // the client is stopped      throw new IOException("The client is stopped");    }    Connection connection;    /* we could avoid this allocation for each RPC by having a       * connectionsId object and with set() method. We need to manage the     * refs for keys in HashMap properly. For now its ok.     */    do {      synchronized (connections) {        connection = connections.get(remoteId);        if (connection == null) {          connection = new Connection(remoteId, serviceClass);          connections.put(remoteId, connection);        }      }    } while (!connection.addCall(call));        //we don't invoke the method below inside "synchronized (connections)"    //block above. The reason for that is if the server happens to be slow,    //it will take longer to establish a connection and that will slow the    //entire system down.    connection.setupIOstreams(fallbackToSimpleAuth);    return connection;  }

5.1 Connection 的sendRpcRequest()向服务端发送请求

public void sendRpcRequest(final Call call)        throws InterruptedException, IOException {      if (shouldCloseConnection.get()) {        return;      }      // Serialize the call to be sent. This is done from the actual      // caller thread, rather than the sendParamsExecutor thread,            // so that if the serialization throws an error, it is reported      // properly. This also parallelizes the serialization.      //      // Format of a call on the wire:      // 0) Length of rest below (1 + 2)      // 1) RpcRequestHeader  - is serialized Delimited hence contains length      // 2) RpcRequest      //      // Items '1' and '2' are prepared here.       final DataOutputBuffer d = new DataOutputBuffer();      RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(          call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,          clientId);      header.writeDelimitedTo(d);      call.rpcRequest.write(d);      synchronized (sendRpcRequestLock) {        Future
senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { synchronized (Connection.this.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); byte[] data = d.getData(); int totalLength = d.getLength(); out.writeInt(totalLength); // Total Length out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest out.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an // unrecoverable state (eg half a call left on the wire). // So, close the connection, killing any outstanding calls markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } } }); try { senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new RuntimeException("unexpected checked exception", cause); } } } }

 

5.2 Connection 的run()获取服务端返回的数据

可以看到通过receiveRpcResponse()方法通过之前建立的输入流in获取服务器传来的数据,并将数据value传给call数据对象call.setRpcResponse(value);,

在call.setRpcResponse(value)方法中通过callComplete()将call数据对象设置成已完成,并通过notify()唤醒该call对象。

在Client的call()方法中,检测到call对象已完成后,就将call对象中的响应数据返回给调用者。

至此,一个完整的RPC远程过程调用的过程就完成了。

public void run() {      if (LOG.isDebugEnabled())        LOG.debug(getName() + ": starting, having connections "             + connections.size());      try {        while (waitForWork()) {
//wait here for work - read or close connection循环等待获取服务端数据 receiveRpcResponse();//获取服务端数据的具体实现 } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
private void receiveRpcResponse() {      if (shouldCloseConnection.get()) {        return;      }      touch();            try {        int totalLen = in.readInt();        RpcResponseHeaderProto header =             RpcResponseHeaderProto.parseDelimitedFrom(in);        checkResponse(header);        int headerLen = header.getSerializedSize();        headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);        int callId = header.getCallId();        if (LOG.isDebugEnabled())          LOG.debug(getName() + " got value #" + callId);        Call call = calls.get(callId);        RpcStatusProto status = header.getStatus();        if (status == RpcStatusProto.SUCCESS) {          Writable value = ReflectionUtils.newInstance(valueClass, conf);          value.readFields(in);                 // read value          calls.remove(callId);          call.setRpcResponse(value);                    // verify that length was correct          // only for ProtobufEngine where len can be verified easily          if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {            ProtobufRpcEngine.RpcWrapper resWrapper =                 (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();            if (totalLen != headerLen + resWrapper.getLength()) {               throw new RpcClientException(                  "RPC response length mismatch on rpc success");            }          }        } else { // Rpc Request failed          // Verify that length was correct          if (totalLen != headerLen) {            throw new RpcClientException(                "RPC response length mismatch on rpc error");          }                    final String exceptionClassName = header.hasExceptionClassName() ?                header.getExceptionClassName() :                   "ServerDidNotSetExceptionClassName";          final String errorMsg = header.hasErrorMsg() ?                 header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;          final RpcErrorCodeProto erCode =                     (header.hasErrorDetail() ? header.getErrorDetail() : null);          if (erCode == null) {             LOG.warn("Detailed error code not set by server on rpc error");          }          RemoteException re =               ( (erCode == null) ?                   new RemoteException(exceptionClassName, errorMsg) :              new RemoteException(exceptionClassName, errorMsg, erCode));          if (status == RpcStatusProto.ERROR) {            calls.remove(callId);            call.setException(re);          } else if (status == RpcStatusProto.FATAL) {            // Close the connection            markClosed(re);          }        }      } catch (IOException e) {        markClosed(e);      }    }

 

转载于:https://www.cnblogs.com/arbitrary/p/5628737.html

你可能感兴趣的文章
C#中删除字符串最后一个字符的几种方法
查看>>
远程管理控制ssh
查看>>
Django框架之ORM(数据库)操作
查看>>
解决:Cannot find ContentTypeReader HeightmapCollision.HeightMapInfoReader
查看>>
内存映射(Linux设备驱动程序)
查看>>
bzoj 5072
查看>>
[Luogu] 矩阵加速(数列)
查看>>
[LeetCode] Design Circular Queue 设计环形队列
查看>>
运动回调-链式运动
查看>>
素数---小修改1
查看>>
linux shell常用快捷键(转载)
查看>>
Ajax 跨域请求
查看>>
spring test---測试SpringMvc初识
查看>>
信息加密之消息摘要算法的MAC
查看>>
Docker 组件如何协作?- 每天5分钟玩转容器技术(8)
查看>>
js时间日期处理
查看>>
161117、使用spring声明式事务抛出 identifier of an instance of
查看>>
解决IE6-IE7下li上下间距
查看>>
勇于担当:好男人的三块责任田——
查看>>
小组冲刺第三天
查看>>