本文主要是为了记录在工作中遇到的常用软件的安装过程,方便以后遇到相同情形时可以快速的查阅。主要讲述了zookeeper, kafka的安装。
本文的操作系统采用的是CentOS,可以采用shell命令查阅:lsb_release -a.
Zookeeper
Zookeeper的安装与配置
可以在http://zookeeper.apache.org/这里下载需要的安装包。这里采用的是zookeeper-3.4.6.tar.gz安装包。
1 首先将安装包解压:
[root@hidden util]# tar -zxvf zookeeper-3.4.6.tar.gz |
2 修改系统的环境变量
[root@hidden zookeeper-3.4.6]#vim /etc/profile |
3 创建Zookeeper的配置文件
[root@hidden zookeeper-3.4.6]#cd conf |
4 配置zoo.cfg
tickTime=2000 #ZooKeeper服务器心跳时间,单位为ms |
5 创建配置中的相应目录
cd /tmp |
或者可以试一下
mkdir -p /tmp/ZooKeeper/data |
6 启动ZooKeeper
cd /usr/ZooKeeper/bin |
7 可以使用ZooKeeper自带的客户端工具来查看ZooKeeper的节点建立情况(bin/zkCli.sh)
ZooKeeper API使用简介
ZooKeeper实现了一个层次命名空间的数据模型,也可以认为它就是一个小型的、精简的文件系统。它的每个节点称为znode, znode除了本身能够包含一部分数据之外,还能够拥有子节点,当节点上的数据发生变化,或者其子节点发生变化,基于watcher机制,会发出相应的通知给订阅其状态变化的客户端。
首先,实例化一个ZooKeeper对象,指定其三个参数。url为ZooKeeper服务器的地址。sessionTimeOut为会话的超时时间,ZooKeeper的会话超时时间的长度由客户端来确定,但是ZooKeeper的Server端会有两个配置,minSessionTimeout和maxSessionTimeout,minSessionTimeout的值默认为2倍的tickTime, maxSessionTimeout的值默认为20倍的tickTime,单位都是ms. tickTime也是服务端的一个配置项,是Server内部控制时间逻辑的最小时间单位,如果客户端发来的sessionTimeout超过minSessionTimeout~maxSessionTimeout这个范围,Server会自动取minSessionTimeout或者maxSessionTimeout作为sessionTimeout, 然后为这个Client新建一个session对象。最后一个参数为默认的watcher.如果包含boolean watch的读方法中传入true, 则将默认的watcher注册为所关注时间的watcher,如果传入false,则不注册任何watcher,这里暂且定为空。
ZooKeeper zooKeeper = new ZooKeeper(url,sesssionTimeout,null);//注意将安装目录下的zookeeper的jar包拷贝出来导入你的project的libs中;zookeeper的默认端口号为2181 |
1 创建节点
通过ZooKeeper的API新增一个znode节点,节点在被创建时,需要制定节点的路径(此处为/root)包含的字节数据,访问权限(如果不想设置,则制定为Ids.OPEN_ACL_UNSAFE),以及创建的节点类型,节点的类型如表所示:
节点类型 | 解释 |
---|---|
CreateMode.PRESISTENT | 持久节点,该节点在客户端断开连接后不会删除 |
CreateMode.EPHEMERAL | 临时节点,该节点在客户端断开连接后删除 |
CreateMode.PERSISTENT_SEQUENTIAL | 持久节点,该节点在客户端断开后不会删除,并将在其名下附加一个单调递增数 |
CreateMode.EPHEMERAL_SEQUENTIAL | 临时节点,该节点在客户端断开后删除,并将在其名下附加一个单调递增数 |
创建节点实例:
//创建/root节点,其包含的数据为“root data",访问权限为开放,所有人均可以访问,创建模式为持久化节点 |
2 删除节点
当不需要某个节点,或者某个节点上的信息已经失效时,使用delete方法可以将该节点删除,删除时需要制定节点的版本号version,如果设置为-1,则匹配所有的版本,ZooKeeper会比较删除的节点版本是否和服务器上的版本一致,如果不一致则抛出异常。
删除节点实例:
zooKeeper.delete("/root",-1); |
3 设置和获取节点内容
如果想在已有节点中保存数据,可以通过ZooKeeper的setData方法,将数据保存到节点上,也可以通过ZooKeeper的getData方法来获取该节点保存的数据,一个znode中最多能够保存1MB的数据。
设置和获取节点内容的实例。
//设置/root节点的数据,版本号为-1,如果匹配不到相应的节点,会抛出异常 |
setData方法设置/root节点的数据为“hello”,getData方法取得root节点上保存的节点数据,false表示不使用默认的watcher,第三个参数为Stat, 表示节点的状态,是一个传出的参数,将会返回该节点当前的状态信息。
4 添加子节点
ZooKeeper支持在已有的节点下添加子节点,同样也是用使用create()方法,但是父节点必须存在,否则会跑出异常。
创建子节点的实例:
zooKeeper.create("/root","root data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); |
5 判断节点是否存在
当进行系统初始化时,或者当前需要给一个节点创建子节点时,通常需要判断系统中的一些节点是否存在。
Stat stat = zooKeeper.exists("/root/child1",false); |
判断/root/child1节点是否存在,如果存在,返回stat不为null,否则为null.
6 watcher的实现
当节点的状态发生变化,通过watcher机制,可以让客户端得到通知,watcher需要实现org.apache.ZooKeeper.Watcher接口。节点的状态变化主要包含如表所示的几种情况。
节点状态变化 | 解释 |
---|---|
EventType.NodeDeleted | 删除节点 |
EventType.NodeChildrenChanged | 修改节点的子节点 |
EventType.NodeCreated | 创建节点 |
EventType.NodeDataChanged | 修改节点数据 |
watcher的实现实例:
public class ZKWatcher implements Watcher{ |
需要注意的是,ZooKeeper的watcher是一次性的,也就是说,每次在处理完状态变化时间之后,需要重新注册watcher,这一点很让人抓狂。这个特性也使得在处理时间和重新加上watcher这段时间发生的节点状态变化将无法被感知。
异常
1 ZooKeeer常常发生下面两种系统异常:
org.apache.ZooKeeper.KeeperException.ConnectionLossException,客户端与其中的一台服务器socket连接出现异常,连接丢失;
org.apache.ZooKeeper.KeeperException.SessionExpiredException, 客户端的session已经超过sessionTimeout,未进行任何操作。
ConnectionLossException异常可以通过重试进行处理,客户端会根据初始化ZooKeeper时传递的服务列表,自动尝试下一个服务端节点,而在这段时间内,服务端节点变更的事件就会丢失。
SessionExpiredException异常不能通过重试解决,需要应用重新创建一个新的客户端(new ZooKeeper()),这时所有的watcher和EPHEMERAL节点都将失效。
一般情况下,不采用原生态的API进行客户端操作,而是采用zkClient或者Curator进行操作,这个会在后续的文章中讲述。
2 异常
2016-08-04 20:12:54 -[INFO] - [Opening socket connection to server xx.xx.xx.86/xx.xx.xx.86:2181. Will not attempt to authenticate using SASL (unknown error)] - [org.apache.zookeeper.ClientCnxn$SendThread:975] |
原因:
- 网络不通,端口不可用等;
- zookeeper服务程序未正常启动;
- 防火墙未关闭;
Kafka
Kafka的安装与配置
Kafka的下载页面:http://kafka.apache.org/downloads.html。下载之后解压:
[root@hidden util]# tar -zxvf kafka_2.8.9-0.8.1.1 |
/bin 启动和停止命令等
/config 配置文件
/libs 类库
启动
在Kafka启动之前需要先启动ZooKeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties |
之后启动Kafka Server:
bin/kafka-server-start.sh config/server.properties |
停止kafka:
bin/kafka-server-stop.sh |
停止zookeeper
bin/zookeeper-server-stop.sh |
测试
运行producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test |
运行consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning |
在producer端输入字符串并回车,查看consumer端是否显示。
关于kafka的介绍以及在java中如何操作kafka会在之后的博文中展开介绍。
参考资料
- 《大型分布式网站架构设计与实践》陈康贤著。
- Zookeeper 安装和配置
- Kafka 安装和测试