首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 企业软件 > 行业软件 >

HBASE 代码翻阅笔记-1 - PUT-3-提交任务2(基于0.94.12)

2013-11-13 
HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.94.12)上一篇把提交任务的主流程整理了下,遗留了连接、发送

HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.94.12)
上一篇把提交任务的主流程整理了下,遗留了连接、发送请求、处理响应三个核心流程,今天就继续吧。
首先创建了一个Call对象,这是HBaseClient的一个内部类,代码很简单,贴出来完事。

protected class Call {        final int id;                                       // call id        final Writable param;                               // parameter        Writable value;                               // value, null if error        IOException error;                            // exception, null if value        boolean done;                                 // true when call is done        long startTime;        protected Call(Writable param) {            this.param = param;            this.startTime = System.currentTimeMillis();            synchronized (HBaseClient.this) {                this.id = counter++;            }        }        /**         * Indicate when the call is complete and the         * value or error are available.  Notifies by default.         */        protected synchronized void callComplete() {            this.done = true;            notify();                                 // notify caller        }        /**         * Set the exception when there is an error.         * Notify the caller the call is done.         *         * @param error exception thrown by the call; either local or remote         */        public synchronized void setException(IOException error) {            this.error = error;            callComplete();        }        /**         * Set the return value when there is no error.         * Notify the caller the call is done.         *         * @param value return value of the call.         */        public synchronized void setValue(Writable value) {            this.value = value;            callComplete();        }        public long getStartTime() {            return this.startTime;        }    }


接着是HBaseClient的getConnection方法,这里主要是检查缓存并返回或者创建一个Connection对象,该对象是Thread的一个子类
    protected Connection getConnection(InetSocketAddress addr,              Class<? extends VersionedProtocol> protocol,                                       User ticket,                                       int rpcTimeout,                                       Call call)            throws IOException, InterruptedException {        if (!running.get()) {            // the client is stopped            throw new IOException("The client is stopped");        }        Connection connection;    /* we could avoid this allocation for each RPC by having a     * connectionsId object and with set() method. We need to manage the     * refs for keys in HashMap properly. For now its ok.     */        ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);        synchronized (connections) {            connection = connections.get(remoteId);//这里用CHM做了一个缓存            if (connection == null) {                connection = createConnection(remoteId);                connections.put(remoteId, connection);//如果没有连接则创建一个            }        }        connection.addCall(call);        //we don't invoke the method below inside "synchronized (connections)"        //block above. The reason for that is if the server happens to be slow,        //it will take longer to establish a connection and that will slow the        //entire system down.        //Moreover, if the connection is currently created, there will be many threads        // waiting here; as setupIOstreams is synchronized. If the connection fails with a        // timeout, they will all fail simultaneously. This is checked in setupIOstreams.        // 这里讲述了为什么setupIOstream方法不放在同步块里,因为这是一个同步操作,如果某一个服务端        // 变慢的话,这样会拖死整个系统。即便连上了,该方法检查的超时异常也会导致灯带同步块的线程都失效        connection.setupIOstreams();        return connection;    }


见识一下传说中会拖死整个系统的方法
       /**         * Connect to the server and set up the I/O streams. It then sends         * a header to the server and starts         * the connection thread that waits for responses.         *         * @throws java.io.IOException e         */        protected synchronized void setupIOstreams()                throws IOException, InterruptedException {            if (socket != null || shouldCloseConnection.get()) {                return;            }//检查连接是否已经关闭            if (failedServers.isFailedServer(remoteId.getAddress())) {                if (LOG.isDebugEnabled()) {                    LOG.debug("Not trying to connect to " + remoteId.getAddress() +                            " this server is in the failed servers list");                }                IOException e = new FailedServerException(                        "This server is in the failed servers list: " + remoteId.getAddress());                markClosed(e);                close();                throw e;            }//检查连接指向的主机是不是已经失效            try {                if (LOG.isDebugEnabled()) {                    LOG.debug("Connecting to " + remoteId);                }                setupConnection();//这里是建立连接                this.in = new DataInputStream(new BufferedInputStream                        (new PingInputStream(NetUtils.getInputStream(socket))));                this.out = new DataOutputStream                        (new BufferedOutputStream(NetUtils.getOutputStream(socket)));                writeHeader();//初次创建连接需要发送同步头                // update last activity time                touch();                // start the receiver thread after the socket connection has been set up                start();//开始执行            } catch (Throwable t) {                failedServers.addToFailedServers(remoteId.address);                IOException e;                if (t instanceof IOException) {                    e = (IOException) t;                } else {                    e = new IOException("Could not set up IO Streams", t);                }                markClosed(e);                close();                throw e;            }        }


建立连接
      protected synchronized void setupConnection() throws IOException {            short ioFailures = 0;            short timeoutFailures = 0;            while (true) {                try {                    this.socket = socketFactory.createSocket();                    //设置nodelay,hbase.ipc.client.tcpnodelay,默认false                     //比较关键的参数,默认值坑爹,关键是配置文件里面还没有                    this.socket.setTcpNoDelay(tcpNoDelay);                    //设置keepAlive,hbase.ipc.client.tcpkeepalive,默认true                    this.socket.setKeepAlive(tcpKeepAlive);                    // connection time out is 20s                    NetUtils.connect(this.socket, remoteId.getAddress(),                            getSocketTimeout(conf));//ipc.socket.timeout,默认20000,配置文件里面也没有哦亲                    if (remoteId.rpcTimeout > 0) {                        pingInterval = remoteId.rpcTimeout; // overwrite pingInterval                    }                    this.socket.setSoTimeout(pingInterval);//这个ipc.ping.interval,默认60s,还是没有配置哦亲                    return;                } catch (SocketTimeoutException toe) {          /* The max number of retries is 45,           * which amounts to 20s*45 = 15 minutes retries.           */                    handleConnectionFailure(timeoutFailures++, maxRetries, toe);                } catch (IOException ie) {                    handleConnectionFailure(ioFailures++, maxRetries, ie);                }            }        }


连接完了,开始执行,逻辑很简单,等待响应,然后关闭连接
      public void run() {            if (LOG.isDebugEnabled())                LOG.debug(getName() + ": starting, having connections "                        + connections.size());            try {                while (waitForWork()) {//wait here for work - read or close connection                    receiveResponse();                }            } catch (Throwable t) {                LOG.warn("Unexpected exception receiving call responses", t);                markClosed(new IOException("Unexpected exception receiving call responses", t));            }            close();            if (LOG.isDebugEnabled())                LOG.debug(getName() + ": stopped, remaining connections "                        + connections.size());        }               protected void receiveResponse() {            if (shouldCloseConnection.get()) {                return;            }            touch();            try {                // See HBaseServer.Call.setResponse for where we write out the response.                // It writes the call.id (int), a flag byte, then optionally the length                // of the response (int) followed by data.                // 接受响应,一个int型的id,一个消息长度也是int型,后续是真实的数据                // Read the call id.                int id = in.readInt();                if (LOG.isDebugEnabled())                    LOG.debug(getName() + " got value #" + id);                Call call = calls.get(id);                // Read the flag byte                byte flag = in.readByte();                boolean isError = ResponseFlag.isError(flag);                if (ResponseFlag.isLength(flag)) {                    // Currently length if present is unused.                    in.readInt();                }                int state = in.readInt(); // Read the state.  Currently unused.                if (isError) {                    if (call != null) {                        //noinspection ThrowableInstanceNeverThrown                        call.setException(new RemoteException(WritableUtils.readString(in),                                WritableUtils.readString(in)));                    }                } else {                    Writable value = ReflectionUtils.newInstance(valueClass, conf);                    value.readFields(in);                 // read value                    // it's possible that this call may have been cleaned up due to a RPC                    // timeout, so check if it still exists before setting the value.                    if (call != null) {                        call.setValue(value);                    }                }                calls.remove(id);            } catch (IOException e) {                if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {                    // Clean up open calls but don't treat this as a fatal condition,                    // since we expect certain responses to not make it by the specified                    // {@link ConnectionId#rpcTimeout}.                    closeException = e;                } else {                    // Since the server did not respond within the default ping interval                    // time, treat this as a fatal condition and close this connection                    markClosed(e);                }            } finally {                if (remoteId.rpcTimeout > 0) {                    cleanupCalls(remoteId.rpcTimeout);                }            }        }


倒回来看看sendParam方法,比较简单了
      protected void sendParam(Call call) {            if (shouldCloseConnection.get()) {                return;            }            // For serializing the data to be written.            final DataOutputBuffer d = new DataOutputBuffer();            try {                if (LOG.isDebugEnabled())                    LOG.debug(getName() + " sending #" + call.id);                d.writeInt(0xdeadbeef); // placeholder for data length                d.writeInt(call.id);                call.param.write(d);                byte[] data = d.getData();                int dataLength = d.getLength();                // fill in the placeholder                Bytes.putInt(data, 0, dataLength - 4);                //noinspection SynchronizeOnNonFinalField                synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC                    out.write(data, 0, dataLength);                    out.flush();                }            } catch (IOException e) {                markClosed(e);            } finally {                //the buffer is just an in-memory buffer, but it is still polite to                // close early                IOUtils.closeStream(d);            }        }


到此为止,整个客户端的PUT操作流程全部读完,收工。准备啃服务端。

热点排行