首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > 云计算 >

深入辨析SolrCloud(三)

2012-09-02 
深入剖析SolrCloud(三)?在上一篇中介绍了SolrCloud的第一个模块---构建管理solr集群状态信息的zookeeper集

深入剖析SolrCloud(三)

?

在上一篇中介绍了SolrCloud的第一个模块---构建管理solr集群状态信息的zookeeper集群。当我们在solr服务器启动时拥有了这样一个Zookeeper集群后,显然我们需要连接到Zookeeper集群的方便手段,在这一篇中我将对Zookeeper客户端相关的各个封装类进行分析。

SolrZkClient类是Solr服务器用来与Zookeeper集群进行通信的接口类,它包含的主要组件有:

?

private ConnectionManager connManager;private volatile SolrZooKeeper keeper;private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();

?其中ConnectionManager是Watcher的实现类,主要负责对客户端与Zookeeper集群之间连接的状态变化信息进行响应,关于Watcher的详细介绍,可以参考http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkWatches,

?

SolrZooKeeper类是一个包装类,没有实际意义,ZkCmdExecutor类是负责在连接失败的情况下,重试某种操作特定次数,具体的操作是ZkOperation这个抽象类的具体实现子类,其execute方法中包含了具体操作步骤,这些操作包括新建一个Znode节点,读取Znode节点数据,创建Znode路径,删除Znode节点等Zookeeper操作。

首先来看它的构造函数,先创建ConnectionManager对象来响应两端之间的状态变化信息,然后ZkClientConnectionStrategy类是一个连接策略抽象类,它包含连接和重连两种策略,并且采用模板方法模式,具体的实现是通过静态累不类ZkUpdate来实现的,DefaultConnectionStrategy是它的一个实现子类,它覆写了connect和reconnect两个连接策略方法。

?

public SolrZkClient(String zkServerAddress, int zkClientTimeout,      ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException,      TimeoutException, IOException {    connManager = new ConnectionManager("ZooKeeperConnection Watcher:"        + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);    strat.connect(zkServerAddress, zkClientTimeout, connManager,        new ZkUpdate() {          @Override          public void update(SolrZooKeeper zooKeeper) {            SolrZooKeeper oldKeeper = keeper;            keeper = zooKeeper;            if (oldKeeper != null) {              try {                oldKeeper.close();              } catch (InterruptedException e) {                // Restore the interrupted status                Thread.currentThread().interrupt();                log.error("", e);                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,                    "", e);              }            }          }        });    connManager.waitForConnected(clientConnectTimeout);    numOpens.incrementAndGet();  }

?值得注意的是,构造函数中生成的ZkUpdate匿名类对象,它的update方法会被调用,在这个方法里,会首先将已有的老的SolrZooKeeperg关闭掉,然后放置上一个新的SolrZooKeeper。做好这些准备工作以后,就会去连接Zookeeper服务器集群,

?

?

connManager.waitForConnected(clientConnectTimeout);//连接zk服务器集群,默认30秒超时时间

?

其实具体的连接动作是new SolrZooKeeper(serverAddress, timeout, watcher)引发的,上面那句代码只是在等待指定时间,看是否已经连接上。

如果连接Zookeeper服务器集群成功,那么就可以进行Zookeeper的常规操作了:

1) 是否已经连接

?

public boolean isConnected() {    return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;  }

?2) ?是否存在某个路径的Znode

public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {    if (retryOnConnLoss) {      return zkCmdExecutor.retryOperation(new ZkOperation() {        @Override        public Stat execute() throws KeeperException, InterruptedException {          return keeper.exists(path, watcher);        }      });    } else {      return keeper.exists(path, watcher);    }  }

?3) 创建一个Znode节点

public String create(final String path, final byte data[], final List<ACL> acl, final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, InterruptedException {    if (retryOnConnLoss) {      return zkCmdExecutor.retryOperation(new ZkOperation() {        @Override        public String execute() throws KeeperException, InterruptedException {          return keeper.create(path, data, acl, createMode);        }      });    } else {      return keeper.create(path, data, acl, createMode);    }  }

?4) ?获取指定路径下的孩子Znode节点

public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {    if (retryOnConnLoss) {      return zkCmdExecutor.retryOperation(new ZkOperation() {        @Override        public List<String> execute() throws KeeperException, InterruptedException {          return keeper.getChildren(path, watcher);        }      });    } else {      return keeper.getChildren(path, watcher);    }  }

?5) 获取指定Znode上附加的数据

public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss) throws KeeperException, InterruptedException {    if (retryOnConnLoss) {      return zkCmdExecutor.retryOperation(new ZkOperation() {        @Override        public byte[] execute() throws KeeperException, InterruptedException {          return keeper.getData(path, watcher, stat);        }      });    } else {      return keeper.getData(path, watcher, stat);    }  }

?6) ?在指定Znode上设置数据

public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss) throws KeeperException, InterruptedException {    if (retryOnConnLoss) {      return zkCmdExecutor.retryOperation(new ZkOperation() {        @Override        public Stat execute() throws KeeperException, InterruptedException {          return keeper.setData(path, data, version);        }      });    } else {      return keeper.setData(path, data, version);    }  }

?7) 创建路径

public void makePath(String path, byte[] data, CreateMode createMode, Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException {    if (log.isInfoEnabled()) {      log.info("makePath: " + path);    }    boolean retry = true;        if (path.startsWith("/")) {      path = path.substring(1, path.length());    }    String[] paths = path.split("/");    StringBuilder sbPath = new StringBuilder();    for (int i = 0; i < paths.length; i++) {      byte[] bytes = null;      String pathPiece = paths[i];      sbPath.append("/" + pathPiece);      final String currentPath = sbPath.toString();      Object exists = exists(currentPath, watcher, retryOnConnLoss);      if (exists == null || ((i == paths.length -1) && failOnExists)) {        CreateMode mode = CreateMode.PERSISTENT;        if (i == paths.length - 1) {          mode = createMode;          bytes = data;          if (!retryOnConnLoss) retry = false;        }        try {          if (retry) {            final CreateMode finalMode = mode;            final byte[] finalBytes = bytes;            zkCmdExecutor.retryOperation(new ZkOperation() {              @Override              public Object execute() throws KeeperException, InterruptedException {                keeper.create(currentPath, finalBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, finalMode);                return null;              }            });          } else {            keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);          }        } catch (NodeExistsException e) {                    if (!failOnExists) {            // TODO: version ? for now, don't worry about race            setData(currentPath, data, -1, retryOnConnLoss);            // set new watch            exists(currentPath, watcher, retryOnConnLoss);            return;          }                    // ignore unless it's the last node in the path          if (i == paths.length - 1) {            throw e;          }        }        if(i == paths.length -1) {          // set new watch          exists(currentPath, watcher, retryOnConnLoss);        }      } else if (i == paths.length - 1) {        // TODO: version ? for now, don't worry about race        setData(currentPath, data, -1, retryOnConnLoss);        // set new watch        exists(currentPath, watcher, retryOnConnLoss);      }    }  }

?8) 删除指定Znode

public void delete(final String path, final int version, boolean retryOnConnLoss) throws InterruptedException, KeeperException {    if (retryOnConnLoss) {      zkCmdExecutor.retryOperation(new ZkOperation() {        @Override        public Stat execute() throws KeeperException, InterruptedException {          keeper.delete(path, version);          return null;        }      });    } else {      keeper.delete(path, version);    }  }

??我们再回过头来看看ConnectionManager类是如何响应两端的连接状态信息的变化的,它最重要的方法是process方法,当它被触发回调时,会从WatchedEvent参数中得到事件的各种状态信息,比如连接成功,会话过期(此时需要进行重连),连接断开等。

public synchronized void process(WatchedEvent event) {    if (log.isInfoEnabled()) {      log.info("Watcher " + this + " name:" + name + " got event " + event + " path:" + event.getPath() + " type:" + event.getType());    }    state = event.getState();    if (state == KeeperState.SyncConnected) {      connected = true;      clientConnected.countDown();    } else if (state == KeeperState.Expired) {      connected = false;      log.info("Attempting to reconnect to recover relationship with ZooKeeper...");      //尝试重新连接zk服务器      try {        connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,            new ZkClientConnectionStrategy.ZkUpdate() {              @Override              public void update(SolrZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {                synchronized (connectionStrategy) {                  waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);                  client.updateKeeper(keeper);                  if (onReconnect != null) {                    onReconnect.command();                  }                  synchronized (ConnectionManager.this) {                    ConnectionManager.this.connected = true;                  }                }                              }            });      } catch (Exception e) {        SolrException.log(log, "", e);      }      log.info("Connected:" + connected);    } else if (state == KeeperState.Disconnected) {      connected = false;    } else {      connected = false;    }    notifyAll();  }

?

?

?

?

作者:洞庭散人

出处:http://phinecos.cnblogs.com/    

本博客遵从Creative Commons Attribution 3.0 License,若用于非商业目的,您可以自由转载,但请保留原作者信息和文章链接URL。

?

?

1 楼 cc3514772b 2012-04-27   目前也在使用solr ,想跟lz交流下。。

热点排行