开始读zookeeper代码,首先启动zookeeper,看到
java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /home/uniseraph/dev/zookeeper-3.3.3/bin/../build/classes:/home/uniseraph/dev/zookeeper-3.3.3/bin/../build/lib/log4j-1.2.15.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../build/lib/jline-0.9.94.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../zookeeper-3.3.3.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../lib/log4j-1.2.15.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../lib/jline-0.9.94.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../src/java/lib/ivy-2.1.0.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../src/java/lib/ant-eclipse-1.0-jvm1.2.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../conf: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /home/uniseraph/dev/zookeeper-3.3.3/bin/../conf/zoo.cfg
确认入口函数在QuorumPeerMain。
1. 去掉一些注释,找到关键初始化点
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
}
2. 解析输入参数,选择cluster/standalone
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
3 runFromConfig读取配置文件,进行系统初始化
3.1 注册一个MBean
ManagedUtil.registerLog4jMBeans();
3.2 初始化一个连接工厂,这里将是关键,为nio socket server进行一些初始化动作
NIOServerCnxn.Factory cnxnFactory =
new NIOServerCnxn.Factory(config.getClientPortAddress(),
config.getMaxClientCnxns());
public Factory(InetSocketAddress addr, int maxcc) throws IOException {
super("NIOServerCxn.Factory:" + addr);
setDaemon(true);
maxClientCnxns = maxcc;
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
3.3 初始化一个QuorumPeer,将配置文件中参数赋值给它,并启动之。QuorumPeer负责处理quorum protol,选出leader。
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.start();
public synchronized void start() {
try {
zkDb.loadDataBase();
} catch(IOException ie) {
LOG.fatal("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
cnxnFactory.start();
startLeaderElection();
super.start();
}
3.3.1 quorumPeer.start启动时候先加载硬盘上的zk数据;
try {
zkDb.loadDataBase();
} catch(IOException ie) {
LOG.fatal("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
3.3.2 启动factory的nio 监听线程,开始循环
cnxnFactory.start();
因为Factory继承自Thread,所以新起一个线程执行。
public void run() {
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
clear();
LOG.info("NIOServerCnxn factory exited run method");
}
3.3.2.1 首先在一个无限循环中,nio selector进行监听,每1秒或者有数据来就唤醒一次。
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
3.3.2.2 如果是有链接请求来了,则accept之,并创建链接上下文,且在selector中注册SelectionKey.OP_READ事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
}
3.3.2.3 如果是READ/WRITE事件,处理之
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
}
NIOServerCnxn处理另外分析;
3.3 启动select leader算法
startLeaderElection();
分享到:
相关推荐
zookeeper简单示例代码,包括对象、节点、通信协议、序列化、acl权限、curator应用、zkclient应用等。
dubbo+zookeeper例子代码和部署说明,demo文件下载,包含zookeeper安装文件,dubbo的监控war已经dubbo的源码
Zookeeper权限控制代码,并且描述了在linux客户端指令鉴权
我也是网上,,找的,但是有部分没跑通
zookeeper 经典应用设计 锁、同步和队列分析
zookeeper代码,节点,观察者等等核心代码示例,让你学习更简单
zookeeper客户端原理代码操作应用场景。
改代码主要功能能是完成通过zookeeper同步mysql数据,具体的介绍可以参考博文zookeeperMaster选举以及数据同步
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。
springMVC+dubbo+zookeeper.zip整合 消费端服务端
zookeeper课上的一些代码整理,有关zookeeper锁,注册发现
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。
kafka配置文件zookeeper参数.md
zookeeper集群安装,1. 在根目录创建zookeeper文件夹(service1、service2、service3都创建) [root@localhost /]# mkdir zookeeper 通过Xshell上传文件到service1服务器:上传zookeeper-3.4.6.tar.gz到/software...
ActiveMQ与Zookeeper集群测试代码,用于测试高可用效果 。。。。。。。。。。。。。。。
apache-zookeeper-3.5.10-bin 环境搭配 ...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在$zookeeper_home\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在$zookeeper_home\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。
springboot整合zookeeper项目源代码
学习时个人编写的样例代码,通过zookeeper实现分布式锁与简单的注册中心