首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

使用zookeeper实现的简略的集群服务管理

2013-12-26 
使用zookeeper实现的简单的集群服务管理package zhenghui.lsf.configserver.service/** * User: zhenghui

使用zookeeper实现的简单的集群服务管理
package zhenghui.lsf.configserver.service;/** * User: zhenghui * Date: 13-12-22 * Time: 下午4:57 * 集群注册服务. */public interface AddressService { /** * 设置目标服务的地址 * */ public void setServiceAddresses(String serviceUniqueName, String address); /** * 获取目标服务的地址 * * @param serviceUniqueName * @return String 当没有可用的服务地址的时候,将会返回null */ public String getServiceAddress(String serviceUniqueName);}

?

对应的实现类如下

?

?

AddressComponent.java

package zhenghui.lsf.configserver.impl;import org.apache.commons.lang.StringUtils;import org.springframework.beans.factory.InitializingBean;import zhenghui.lsf.configserver.service.AddressService;import java.util.List;import java.util.Map;import java.util.Random;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.atomic.AtomicBoolean;/** * User: zhenghui * Date: 13-12-22 * Time: 下午4:58 * 基于zk实现.根据简单原则,不做高级路由处理.直接用随机来做路由. */public class AddressComponent extends ZookeeperWatcher implements AddressService, InitializingBean { private AtomicBoolean inited = new AtomicBoolean(false); private static final String DEFAULT_SERVER_PATH = "/zhenghui/lsf/address/"; private static final int DEFAULT_TIME_OUT = 30000; /** * 服务地址cache */ private Map<String,List<String>> serviceAddressCache = new ConcurrentHashMap<String, List<String>>(); /** * zk服务器的地址. */ private String zkAdrress = "10.125.195.174:2181"; @Override public void setServiceAddresses(String serviceUniqueName, String address) { if(StringUtils.isBlank(serviceUniqueName)){ return; } String path = DEFAULT_SERVER_PATH + serviceUniqueName; createPath(path,address); } private void init() throws Exception{ // 避免被初始化多次 if (!inited.compareAndSet(false, true)) { return; } createConnection(zkAdrress,DEFAULT_TIME_OUT); } @Override public String getServiceAddress(String serviceUniqueName) { if(StringUtils.isBlank(serviceUniqueName)){ return null; } String path = DEFAULT_SERVER_PATH + serviceUniqueName; List<String> addressList = serviceAddressCache.get(path) == null ? getChildren(path,true) : serviceAddressCache.get(path); if(addressList == null || addressList.isEmpty()){ return null; } serviceAddressCache.put(path,addressList); return addressList.get(new Random().nextInt(addressList.size())); } @Override public void afterPropertiesSet() throws Exception { init(); } public void setZkAdrress(String zkAdrress) { this.zkAdrress = zkAdrress; } @Override protected void addressChangeHolder(String path) { serviceAddressCache.remove(path); }}?

?

?

ZookeeperWatcher.java

package zhenghui.lsf.configserver.impl;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import zhenghui.lsf.exception.HSFException;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicInteger;/** * User: zhenghui * Date: 13-12-25 * Time: 下午4:13 * 一些zk的封装 * 这里注意,别忘记初始化zk的path.比如创建节点的时候,path是 "/zhenghui/lsf/address/interfacename:1.0.0" 那么请保证 "/zhenghui/lsf/address"节点是存在的,否则会报错. */public abstract class ZookeeperWatcher implements Watcher {    private Logger logger = LoggerFactory.getLogger(ZookeeperWatcher.class);    private CountDownLatch connectedSemaphore = new CountDownLatch(1);    private ZooKeeper zk;    private static final String DEFAULT_DATA = "zhenghui";    /**     * 用来记录watch被调用次数     */    AtomicInteger seq = new AtomicInteger();    /**     * 地址变更,需要做对应的处理.比如缓存清理等     */    abstract protected void addressChangeHolder(String path);    /**     * 创建zk连接     *     */    protected void createConnection(String connectString, int sessionTimeout) throws HSFException {        //先关闭连接        releaseConnection();        try {            zk = new ZooKeeper(connectString, sessionTimeout, this);            logger.info(connectString + "开始连接ZK服务器");            connectedSemaphore.await();        } catch (Exception e) {            logger.error("zhenghui.lsf.configserver.impl.AddressComponent.createConnection error");            throw new HSFException("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createConnection error", e);        }    }    /**     * 关闭ZK连接     */    protected void releaseConnection() {        if (zk != null) {            try {                this.zk.close();            } catch (Exception e) {                logger.error("zhenghui.lsf.configserver.impl.AddressComponent.releaseConnection error");            }        }    }    private static final String separator = "/";    /**     * 创建对应的节点.     */    protected boolean createPath(String path, String data) {        try {            //先判断path是否存在            Stat stat = exists(path, true);            //如果不存在,则创建            if(stat == null){                this.zk.create(path,DEFAULT_DATA.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);                logger.info("父节点创建成功.path= " + path);            }            //再查看子节点是否有            String childPath = path + separator + data;            //子节点不需要对应的watcher.因为父节点已经有针对child data change的watcher处理了            stat = exists(childPath,false);            if(stat == null){                this.zk.create(childPath,DEFAULT_DATA.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);                logger.info("子节点创建成功.path= " + childPath);            }            return true;        } catch (Exception e) {            logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createPath",e);            return false;        }    }    protected Stat exists(String path, boolean needWatch) {        try {            return this.zk.exists(path, needWatch);        } catch (Exception e) {            return null;        }    }    /**     * 获取子节点     *     * @param path 节点path     */    protected List<String> getChildren(String path, boolean needWatch) {        try {            return this.zk.getChildren(path, needWatch);        } catch (Exception e) {            logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.getChildren", e);            return null;        }    }    @Override    public void process(WatchedEvent event){//        try {//            Thread.sleep(300);//        } catch (Exception e) {}        if (event == null) return;        String logPrefix = "Watch-" + seq.incrementAndGet() + ":";        logger.info(logPrefix + event.toString());        // 连接状态        Watcher.Event.KeeperState keeperState = event.getState();        // 事件类型        Watcher.Event.EventType eventType = event.getType();        // 受影响的path        String path = event.getPath();        if (Watcher.Event.KeeperState.SyncConnected == keeperState) {            // 成功连接上ZK服务器            if (Watcher.Event.EventType.None == eventType) {                logger.info(logPrefix + "成功连接上ZK服务器");                connectedSemaphore.countDown();            } else if (Watcher.Event.EventType.NodeCreated == eventType) {                logger.info(logPrefix + "节点创建");//                this.exists(path, true);                addressChangeHolder(path);            } else if (Watcher.Event.EventType.NodeDataChanged == eventType) {                logger.info(logPrefix + "节点数据更新");//                addressChangeHolder(path);            } else if (Watcher.Event.EventType.NodeChildrenChanged == eventType) {                logger.info(logPrefix + "子节点变更");                addressChangeHolder(path);            } else if (Watcher.Event.EventType.NodeDeleted == eventType) {                logger.error(logPrefix + "节点 " + path + " 被删除");//                addressChangeHolder(path);            }        }        //下面可以做一些重连的工作.        else if (Watcher.Event.KeeperState.Disconnected == keeperState) {            logger.error(logPrefix + "与ZK服务器断开连接");        } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {            logger.error(logPrefix + "权限检查失败");        } else if (Watcher.Event.KeeperState.Expired == keeperState) {            logger.error(logPrefix + "会话失效");        }    }}

?

?

? ? ?

热点排行