`
uniseraph
  • 浏览: 82563 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

zookeeper代码阅读

 
阅读更多
开始读zookeeper代码,首先启动zookeeper,看到
java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /home/uniseraph/dev/zookeeper-3.3.3/bin/../build/classes:/home/uniseraph/dev/zookeeper-3.3.3/bin/../build/lib/log4j-1.2.15.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../build/lib/jline-0.9.94.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../zookeeper-3.3.3.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../lib/log4j-1.2.15.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../lib/jline-0.9.94.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../src/java/lib/ivy-2.1.0.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../src/java/lib/ant-eclipse-1.0-jvm1.2.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../conf: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /home/uniseraph/dev/zookeeper-3.3.3/bin/../conf/zoo.cfg


确认入口函数在QuorumPeerMain。

1. 去掉一些注释,找到关键初始化点
 
  public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
            main.initializeAndRun(args);
   }




2. 解析输入参数,选择cluster/standalone
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }


3  runFromConfig读取配置文件,进行系统初始化
3.1 注册一个MBean
       ManagedUtil.registerLog4jMBeans();

3.2 初始化一个连接工厂,这里将是关键,为nio socket server进行一些初始化动作
NIOServerCnxn.Factory cnxnFactory =
              new NIOServerCnxn.Factory(config.getClientPortAddress(),
                      config.getMaxClientCnxns());

     
public Factory(InetSocketAddress addr, int maxcc) throws IOException {
            super("NIOServerCxn.Factory:" + addr);
            setDaemon(true);
            maxClientCnxns = maxcc;
            this.ss = ServerSocketChannel.open();
            ss.socket().setReuseAddress(true);
            LOG.info("binding to port " + addr);
            ss.socket().bind(addr);
            ss.configureBlocking(false);
            ss.register(selector, SelectionKey.OP_ACCEPT);
        }


3.3 初始化一个QuorumPeer,将配置文件中参数赋值给它,并启动之。QuorumPeer负责处理quorum protol,选出leader。
          quorumPeer = new QuorumPeer();
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      new File(config.getDataLogDir()),
                      new File(config.getDataDir())));
          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
  
          quorumPeer.start();
  

 
 
public synchronized void start() {
        try {
            zkDb.loadDataBase();
        } catch(IOException ie) {
            LOG.fatal("Unable to load database on disk", ie);
            throw new RuntimeException("Unable to run quorum server ", ie);
        }
        cnxnFactory.start();        
        startLeaderElection();
        super.start();
    }


3.3.1 quorumPeer.start启动时候先加载硬盘上的zk数据;
 try {
            zkDb.loadDataBase();
        } catch(IOException ie) {
            LOG.fatal("Unable to load database on disk", ie);
            throw new RuntimeException("Unable to run quorum server ", ie);
        }
       


3.3.2 启动factory的nio 监听线程,开始循环

        cnxnFactory.start();  


因为Factory继承自Thread,所以新起一个线程执行。

        public void run() {
            while (!ss.socket().isClosed()) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selected;
                    synchronized (this) {
                        selected = selector.selectedKeys();
                    }
                    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                            selected);
                    Collections.shuffle(selectedList);
                    for (SelectionKey k : selectedList) {
                        if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                            SocketChannel sc = ((ServerSocketChannel) k
                                    .channel()).accept();
                            InetAddress ia = sc.socket().getInetAddress();
                            int cnxncount = getClientCnxnCount(ia);
                            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                                LOG.warn("Too many connections from " + ia
                                         + " - max is " + maxClientCnxns );
                                sc.close();
                            } else {
                                LOG.info("Accepted socket connection from "
                                        + sc.socket().getRemoteSocketAddress());
                                sc.configureBlocking(false);
                                SelectionKey sk = sc.register(selector,
                                        SelectionKey.OP_READ);
                                NIOServerCnxn cnxn = createConnection(sc, sk);
                                sk.attach(cnxn);
                                addCnxn(cnxn);
                            }
                        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                            c.doIO(k);
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Unexpected ops in select "
                                          + k.readyOps());
                            }
                        }
                    }
                    selected.clear();
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    LOG.warn("Ignoring exception", e);
                }
            }
            clear();
            LOG.info("NIOServerCnxn factory exited run method");
        }



3.3.2.1 首先在一个无限循环中,nio selector进行监听,每1秒或者有数据来就唤醒一次。
 selector.select(1000);  
            Set<SelectionKey> selected;  
            synchronized (this) {  
                selected = selector.selectedKeys();  
            }  
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(  
                    selected);  
            Collections.shuffle(selectedList); 


3.3.2.2 如果是有链接请求来了,则accept之,并创建链接上下文,且在selector中注册SelectionKey.OP_READ事件

 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                            SocketChannel sc = ((ServerSocketChannel) k
                                    .channel()).accept();
                            InetAddress ia = sc.socket().getInetAddress();
                            int cnxncount = getClientCnxnCount(ia);
                            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                                LOG.warn("Too many connections from " + ia
                                         + " - max is " + maxClientCnxns );
                                sc.close();
                            } else {
                                LOG.info("Accepted socket connection from "
                                        + sc.socket().getRemoteSocketAddress());
                                sc.configureBlocking(false);
                                SelectionKey sk = sc.register(selector,
                                        SelectionKey.OP_READ);
                                NIOServerCnxn cnxn = createConnection(sc, sk);
                                sk.attach(cnxn);
                                addCnxn(cnxn);
                            }
                        } 



3.3.2.3 如果是READ/WRITE事件,处理之
 else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                            c.doIO(k);
                        }


NIOServerCnxn处理另外分析;

3.3 启动select leader算法
startLeaderElection();



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics