HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.94.12)
上一篇把提交任务的主流程整理了下,遗留了连接、发送请求、处理响应三个核心流程,今天就继续吧。
首先创建了一个Call对象,这是HBaseClient的一个内部类,代码很简单,贴出来完事。
protected class Call { final int id; // call id final Writable param; // parameter Writable value; // value, null if error IOException error; // exception, null if value boolean done; // true when call is done long startTime; protected Call(Writable param) { this.param = param; this.startTime = System.currentTimeMillis(); synchronized (HBaseClient.this) { this.id = counter++; } } /** * Indicate when the call is complete and the * value or error are available. Notifies by default. */ protected synchronized void callComplete() { this.done = true; notify(); // notify caller } /** * Set the exception when there is an error. * Notify the caller the call is done. * * @param error exception thrown by the call; either local or remote */ public synchronized void setException(IOException error) { this.error = error; callComplete(); } /** * Set the return value when there is no error. * Notify the caller the call is done. * * @param value return value of the call. */ public synchronized void setValue(Writable value) { this.value = value; callComplete(); } public long getStartTime() { return this.startTime; } }
protected Connection getConnection(InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout, Call call) throws IOException, InterruptedException { if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } Connection connection; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); synchronized (connections) { connection = connections.get(remoteId);//这里用CHM做了一个缓存 if (connection == null) { connection = createConnection(remoteId); connections.put(remoteId, connection);//如果没有连接则创建一个 } } connection.addCall(call); //we don't invoke the method below inside "synchronized (connections)" //block above. The reason for that is if the server happens to be slow, //it will take longer to establish a connection and that will slow the //entire system down. //Moreover, if the connection is currently created, there will be many threads // waiting here; as setupIOstreams is synchronized. If the connection fails with a // timeout, they will all fail simultaneously. This is checked in setupIOstreams. // 这里讲述了为什么setupIOstream方法不放在同步块里,因为这是一个同步操作,如果某一个服务端 // 变慢的话,这样会拖死整个系统。即便连上了,该方法检查的超时异常也会导致灯带同步块的线程都失效 connection.setupIOstreams(); return connection; }
/** * Connect to the server and set up the I/O streams. It then sends * a header to the server and starts * the connection thread that waits for responses. * * @throws java.io.IOException e */ protected synchronized void setupIOstreams() throws IOException, InterruptedException { if (socket != null || shouldCloseConnection.get()) { return; }//检查连接是否已经关闭 if (failedServers.isFailedServer(remoteId.getAddress())) { if (LOG.isDebugEnabled()) { LOG.debug("Not trying to connect to " + remoteId.getAddress() + " this server is in the failed servers list"); } IOException e = new FailedServerException( "This server is in the failed servers list: " + remoteId.getAddress()); markClosed(e); close(); throw e; }//检查连接指向的主机是不是已经失效 try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to " + remoteId); } setupConnection();//这里是建立连接 this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(NetUtils.getInputStream(socket)))); this.out = new DataOutputStream (new BufferedOutputStream(NetUtils.getOutputStream(socket))); writeHeader();//初次创建连接需要发送同步头 // update last activity time touch(); // start the receiver thread after the socket connection has been set up start();//开始执行 } catch (Throwable t) { failedServers.addToFailedServers(remoteId.address); IOException e; if (t instanceof IOException) { e = (IOException) t; } else { e = new IOException("Could not set up IO Streams", t); } markClosed(e); close(); throw e; } }
protected synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { this.socket = socketFactory.createSocket(); //设置nodelay,hbase.ipc.client.tcpnodelay,默认false //比较关键的参数,默认值坑爹,关键是配置文件里面还没有 this.socket.setTcpNoDelay(tcpNoDelay); //设置keepAlive,hbase.ipc.client.tcpkeepalive,默认true this.socket.setKeepAlive(tcpKeepAlive); // connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), getSocketTimeout(conf));//ipc.socket.timeout,默认20000,配置文件里面也没有哦亲 if (remoteId.rpcTimeout > 0) { pingInterval = remoteId.rpcTimeout; // overwrite pingInterval } this.socket.setSoTimeout(pingInterval);//这个ipc.ping.interval,默认60s,还是没有配置哦亲 return; } catch (SocketTimeoutException toe) { /* The max number of retries is 45, * which amounts to 20s*45 = 15 minutes retries. */ handleConnectionFailure(timeoutFailures++, maxRetries, toe); } catch (IOException ie) { handleConnectionFailure(ioFailures++, maxRetries, ie); } } }
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { while (waitForWork()) {//wait here for work - read or close connection receiveResponse(); } } catch (Throwable t) { LOG.warn("Unexpected exception receiving call responses", t); markClosed(new IOException("Unexpected exception receiving call responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); } protected void receiveResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { // See HBaseServer.Call.setResponse for where we write out the response. // It writes the call.id (int), a flag byte, then optionally the length // of the response (int) followed by data. // 接受响应,一个int型的id,一个消息长度也是int型,后续是真实的数据 // Read the call id. int id = in.readInt(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); Call call = calls.get(id); // Read the flag byte byte flag = in.readByte(); boolean isError = ResponseFlag.isError(flag); if (ResponseFlag.isLength(flag)) { // Currently length if present is unused. in.readInt(); } int state = in.readInt(); // Read the state. Currently unused. if (isError) { if (call != null) { //noinspection ThrowableInstanceNeverThrown call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); } } else { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value // it's possible that this call may have been cleaned up due to a RPC // timeout, so check if it still exists before setting the value. if (call != null) { call.setValue(value); } } calls.remove(id); } catch (IOException e) { if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. closeException = e; } else { // Since the server did not respond within the default ping interval // time, treat this as a fatal condition and close this connection markClosed(e); } } finally { if (remoteId.rpcTimeout > 0) { cleanupCalls(remoteId.rpcTimeout); } } }
protected void sendParam(Call call) { if (shouldCloseConnection.get()) { return; } // For serializing the data to be written. final DataOutputBuffer d = new DataOutputBuffer(); try { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); d.writeInt(0xdeadbeef); // placeholder for data length d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); // fill in the placeholder Bytes.putInt(data, 0, dataLength - 4); //noinspection SynchronizeOnNonFinalField synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC out.write(data, 0, dataLength); out.flush(); } } catch (IOException e) { markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } }