2、Zookeeper配置及集群管理
基本配置
存储配置
网络配置
集群相关配置
基本使用
Zookeeper节点
常用命令
监视器(Watcher)和通知机制
会话
四字命令
集群配置
Python基本操作
python实现节点监控
Python实现Leader选举
Python实现分布式锁
存储配置
网络配置
集群相关配置
基本使用
Zookeeper节点
常用命令
监视器(Watcher)和通知机制
会话
四字命令
集群配置
Python基本操作
python实现节点监控
Python实现Leader选举
Python实现分布式锁
基本配置
clientPort:客户端连接端口,客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
clientPort=2181
dataDir:数据文件目录,Zookeeper保存数据的目录,默认情况下,Zookeeper将写数据的日志文件也保存在这个目录里。
dataDir=/data/zookeeper/data
dataLogDir:事物日志文件路径,默认在data下面;
tickTime:Client-Server通信心跳时间,Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。tickTime以毫秒为单位。tickTime这是zookeeper一个滴答的时常,zookeeper内部衡量时间单位就是tick,比如5个tick或者10个tick,所以tick是多长决定了很多配置是多长时间的,比如各种超时时间,都是2个tick或者5个tick,如果一个tick是2s,那么5个tick就是10秒;
tickTime=2000
存储配置
preAllocSize:表示为事物日志预分配的磁盘空间量,默认是65535kb,如果每次重写导致事物日志都不够那就需要考虑增大此值;
snapCount:指明没多少次事物执行一次快照操作,一般而言每事物平均大小在100字节;
autopurge.snapRetainCount:自动清理快照以及日志文件,指定需要保存的快照文件数量;
autopurge.purgeInterval:自动pruge操作的时间间隔,0表示不启动;
fsync.warningthresholdms:设定zookeeper把内存数据刷写到磁盘上时消耗的时间报警阈值,如果整个操作时间过程,应该发出警告;
weight.x=N:判定quorum的投票权重,默认1;
网络配置
maxClientCnxns:每个IP的最大并发连接数,每个客户端的并发连接数,并非总并发;
clientPortAddress:指定zookeeper监听的IP地址和端口;
minSessionTimeout:会话的最小超时时间,就是会话建立最少有这么长时间,一般而言默认是tick值的两倍;
maxSessionTimeout:会话的最大超时时间;
admin.enableServer:是否打开AdminServer,默认打开,新版本加入;
集群相关配置
initLimit:Leader-Follower初始通信时限,集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)。
initLimit=5
syncLimit:Leader-Follower同步通信时限,集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)。
syncLimit=2
server.N:服务器名称与地址:配置集群各服务器的属性参数(服务器编号,服务器地址,Leader与Follower通信端口,选举端口,指定节点为observer),这个配置项的书写格式比较特殊,默认格式为server.N=YYY:A:B[:observer];
server.1=node01.cce.com:2888:3888
server.2=node02.cce.com:2888:3888
server.3=node03.cce.com:2888:3888
observer:指定该节点为一个observer;
leaderServer:默认情况下,Leader是接收读写请求,额外还要负责协调各Follower发来的事物等,可以指明1或者0,所有的客户端都连到Follower,只不过写请求会经由Follower转给Leader,读操作由各Follower直接提供;
cnxTimeout:Leader选举期间各服务器创建TCP连接的超时时长;
ellectionAlg:选举算法,目前仅支持FastLeaderElection一种;
基本使用
Zookeeper是基于JAVA语言实现的,所以我们实现需要下载好JDK;
# 配置JDK
[root@node01 ~]# tar xf jdk-8u212-linux-x64.tar.gz
[root@node01 ~]# mv jdk1.8.0_212/ /usr/local/jdk
[root@node01 ~]# cat >> /etc/profile << EOF
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:/jre/lib/*:/lib/*
export PATH=\$PATH:\${JAVA_HOME}/bin:\${NODE_HOME}/bin
EOF
[root@node01 ~]# source /etc/profile
[root@node01 ~]# java -version
java version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)
# 安装zookeeper
[root@node01 ~]# tar xf apache-zookeeper-3.6.1-bin.tar.gz
[root@node01 ~]# mv apache-zookeeper-3.6.1-bin /usr/local/zookeeper
# 配置zookeeper
[root@node01 ~]# cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
[root@node01 ~]# grep '^[^#]' /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
admin.enableServer=false
server.1=172.16.1.1:2888:3888
[root@node01 ~]# mkdir -p /data/zookeeper
# 启动zookeeper
[root@node01 ~]# /usr/local/zookeeper/bin/zkServer.sh start
[root@node01 ~]# netstat -ntlp|grep java
tcp6 0 0 :::34797 :::* LISTEN 24522/java
tcp6 0 0 :::8080 :::* LISTEN 24522/java
tcp6 0 0 :::2181 :::* LISTEN 24522/java
# 测试创建一个节点
[zk: localhost:2181(CONNECTED) 17] create -e /eureka 'caidaye'
# 列出指定路径下的所有一级节点
[zk: localhost:2181(CONNECTED) 26] ls /eureka
# 查看节点的节点信息
[zk: localhost:2181(CONNECTED) 27] stat /eureka
# 获取指定节点的内容
[zk: localhost:2181(CONNECTED) 26] get /eureka
# 更新节点的数据
[zk: localhost:2181(CONNECTED) 28] set /eureka 'caichangen'
# 删除节点
[zk: localhost:2181(CONNECTED) 30] delete /eureka
# 创建一个父节点
[zk: localhost:2181(CONNECTED) 31] create /eureka 'This is the parent node'
# 在父节点下面创建子节点
[zk: localhost:2181(CONNECTED) 32] create /eureka/eureka_node1 'This is a child node'
# 查看父节点下面的子节点
[zk: localhost:2181(CONNECTED) 41] ls /eureka
[eureka_node1]
# 查看节点信息
[zk: localhost:2181(CONNECTED) 42] stat /eureka/eureka_node1
cZxid = 0x2a
ctime = Thu May 28 10:40:49 CST 2020
mZxid = 0x2a
mtime = Thu May 28 10:40:49 CST 2020
pZxid = 0x2a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 20
numChildren = 0
Zookeeper节点
Zookeeper的节点类型主要有两种,分别是持久和临时节点,持久节点只能使用delete或者rm来删除,临时节点在客户端连接断开之后自动删除,一般来讲应有程序需要在Znode上持久保存数据,而且在意外应用崩溃之后依然能够使用这些节点,那么我们应该使用持久节点否则就应该使用临时节点;
比如当Master以为离线之后,我们接下来为了能够让其他的Follower重新选举出新的Leader我们就应该使用临时节点,临时节点在对应的会话结束之后就会被删除,所以临时节点不应该有子节点,所以使用create -e来创建子节点是不被允许的;
一个节点还能被赋予一定的顺序,顺序节点的特点在于在创建时,被分配一个特定的唯一且独占的递增型的数字当作序号,此顺序号码也会被附加在节点路径之后用来唯一标识自己,因此基于组合的方式Zookeeper可以生成四钟节点,持久节点、持久顺序节点、临时节点、临时顺序节点;
当然我们的顺序节点和我们的临时持久不是一样的,它也不算是一种类型,它只是一种属性;
常用命令
create [-s] [-e] [-c] [-t ttl] path [data] [acl]:-s持久节点,-e临时节点;
connect:连接到指定的zookeeper
stat:节点的状态信息;
cZxid:标识该节点是由哪一个事物创建所产生的,这个标识它的事物编号;
ctime:节点的创建时间;
mZxid:最近更新该节点的事物ID;
mtime:最近更新该节点的时间;
pZxid:标识该节点的子节点的列表最近一次修改的事物ID;
cversion:子节点的版本号,Zookeeper中的版本号默认是标识相关信息的修改次数的;
dataVersion:数据版本号,Zookeeper中的版本号默认是标识相关信息的修改次数的;
aclVersion:acl的版本号,Zookeeper中的版本号默认是标识相关信息的修改次数的;
ephemeralOwner:如果节点为临时节点,那么它的值为这个节点拥有者的;
dataLength:数据长度;
numChildren:子节点的个数;
监视器(Watcher)和通知机制
对于我们的Client来讲,还有一个Watcher的计时器,对于客户端来讲,Zookeeper是通过网络通信的远程服务,考虑到网络延迟,还有Zookeeper内部的并发访问控制机制等,如果基于客户端拉取模式来工作去获取特定节点的相关信息的改变,很有可能其服务时间代价比较高,我们客户端获取了这个数据之后,只要对方状态发生改变,我们就要立即做出相应操作的;
那么这种情况下,我们之能一遍一遍的去轮询请求,对于一个基于网络的操作来讲轮询的操作代价是非常非常的大的,为了避免这么一个情况,我们的监视器(Watcher)就出现了,Client端一旦有被人关注的数据节点存在时,那么监控者可以在这个节点上注册一个监听器,一旦有了监听器之后,这个数据发生改变,Zookeeper将会将其改变的事件,通知给注册这个数据的监听器的客户端;
但是监听器是一次性的,一旦注册了,那么Zookeeper也看到了事件发生改变也触发了,下次要想再使用监听器,还得重新注册,所以这里要注意,它是一次性的触发通知机制;
在节点上的某次修改操作,导致发生通知以后,如果需要在此节点上,重新注册一个监听器的话,那么前一个事件的触发之后与重新注册监听器之前,客户端有可能会发生改变,而客户端是一无所知的,因为中间是有断层的;
会话
每一个客户端与Zookeeper建立连接之后,客户端与Zookeeper之间是会建立一个会话的,这个会话在Zookeeper端是会始终被记录着的;
四字命令
对于我们的Zookeeper而已,它有很多四字命令,简单来讲就是对我们的数据做一些更新修改等操作,就是由四个字符所组成的命令,我们需要使用telnet或者nc向其发送命令;
ruok:检测是否正常;
stat:获取服务器的状态信息;
conf:获取服务器启动时使用的配置信息;
srvr:显示server端信息;
cons:显示当前服务器上所有的客户端;
wchs:查看服务器有哪些监听器;
envi:查看Zookeeper所使用的JDK的环境信息;
# 在zoo.cfg开启四字命令
4lw.commands.whitelist=*
[root@node01 ~]# /usr/local/zookeeper/bin/zkServer.sh restart
# ruok
[root@node01 ~]# telnet 127.0.0.1 2181
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
ruok
imokConnection closed by foreign host.
# stat
[root@node01 ~]# telnet 127.0.0.1 2181
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
stat
Zookeeper version: 3.6.1--104dcb3e3fb464b30c5186d229e00af9f332524b, built on 04/21/2020 15:01 GMT
Clients:
/172.16.1.254:3799[1](queued=0,recved=14,sent=14)
/127.0.0.1:46218[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0.6154/3
Received: 16
Sent: 15
Connections: 2
Outstanding: 0
Zxid: 0x2b
Mode: standalone
Node count: 7
Connection closed by foreign host.
# conf
[root@node01 ~]# telnet 127.0.0.1 2181
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
conf
clientPort=2181
secureClientPort=-1
dataDir=/data/zookeeper/version-2
dataDirSize=134219400
dataLogDir=/data/zookeeper/version-2
dataLogSize=134219400
tickTime=2000
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
clientPortListenBacklog=-1
serverId=0
Connection closed by foreign host.
集群配置
集群中每一个服务器的ID相关信息,都应该与它数据目录下的myid保持一致;
# 分别在三台机器安装JDK
[root@node01 ~]# tar xf jdk-8u212-linux-x64.tar.gz
[root@node01 ~]# mv jdk1.8.0_212/ /usr/local/jdk
[root@node01 ~]# cat >> /etc/profile << EOF
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:/jre/lib/*:/lib/*
export PATH=\$PATH:\${JAVA_HOME}/bin:\${NODE_HOME}/bin
EOF
[root@node01 ~]# source /etc/profile
[root@node01 ~]# java -version
java version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)
# 分别在三台机器安装安装zookeeper
[root@node01 ~]# tar xf apache-zookeeper-3.6.1-bin.tar.gz
[root@node01 ~]# mv apache-zookeeper-3.6.1-bin /usr/local/zookeeper
# 分别在三台机器配置zookeeper
[root@node01 ~]# cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
[root@node01 ~]# mkdir -p /data/zookeeper/{data,logs}
[root@node01 ~]# grep '^[^#]' /usr/local/zookeeper/conf/zoo.cfgtickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
admin.enableServer=false
4lw.commands.whitelist=*
server.1=172.16.1.1:2888:3888
server.2=172.16.1.2:2888:3888
server.3=172.16.1.3:2888:3888
clientPort=2181
maxClientCnxns=30
# 设定各主机的myid,myid主要是为了来标识当前主机的唯一id号,集群情况下必须告知,否则Zookeeper自己是不知道哪个是自己的,启动会报错
[root@node01 ~]# echo 1 > /data/zookeeper/data/myid
[root@node01 ~]# /usr/local/zookeeper/bin/zkServer.sh start
[root@node01 ~]# netstat -ntlp|grep java
tcp6 0 0 :::39534 :::* LISTEN 7448/java
tcp6 0 0 172.16.1.1:3888 :::* LISTEN 7448/java
tcp6 0 0 :::2181 :::* LISTEN 7448/java
[root@node02 ~]# echo 2 > /data/zookeeper/data/myid
[root@node02 ~]# /usr/local/zookeeper/bin/zkServer.sh start
[root@node02 ~]# netstat -ntlp|grep java
tcp6 0 0 :::39534 :::* LISTEN 7448/java
tcp6 0 0 172.16.1.1:3888 :::* LISTEN 7448/java
tcp6 0 0 :::2181 :::* LISTEN 7448/java
[root@node03 ~]# echo 3 > /data/zookeeper/data/myid
[root@node03 ~]# /usr/local/zookeeper/bin/zkServer.sh start
[root@node03 ~]# netstat -ntlp|grep java
tcp6 0 0 :::39534 :::* LISTEN 7448/java
tcp6 0 0 172.16.1.1:3888 :::* LISTEN 7448/java
tcp6 0 0 :::2181 :::* LISTEN 7448/java
# 测试集群节点状态
[root@node01 ~]# telnet 172.16.1.1 2181 |grep Mode
stat
Connection closed by foreign host.
Mode: leader
[root@node01 ~]# telnet 172.16.1.2 2181 |grep Mode
stat
Mode: follower
Connection closed by foreign host.
[root@node01 ~]# telnet 172.16.1.3 2181 |grep Mode
stat
Mode: follower
Connection closed by foreign host.
Python基本操作
创建节点时,指定ephemeral和sequence都是真,前者表示节点时一个临时节点,后者表示是一个有序节点,综合起来就是一个有序临时节点。指定为临时节点,如果服务崩溃了,那么服务启动时注册的节点会被zookeeper·删除,这样,请求模块就会知道。创建节点时需要指定节点的名称,指定sequence为真,zookeeper就会自己给节点起名字,而且是有序的,这样我们就不用担心节点的名称冲突了。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2020/5/29 16:20
# @Author : CaiChangEn
# @Email : mail0426@163.com
# @Site : https://blog.doorta.com
# @Software: PyCharm
from kazoo.client import KazooClient
class zk(object):
def __init__(self):
self.zk = KazooClient(hosts='172.16.1.1:2181,172.16.1.2:2181,172.16.1.3:2181') # 初始化zk连接
self.zk.start() # 打开一个连接通道
def createNode(self):
self.zk.create(path='/eureka/eureka1',value=bytes('This is first eureka version 1',encoding='utf8'),makepath=True) # 创建一个Znode,makepath为递归创建
def getNode(self):
print(1)
result=self.zk.get_children('/',watch=False)
print(result)
print(2)
def editNode(self):
self.zk.set(path='/eureka/eureka1',value=bytes('This is first eureka version 2',encoding='utf8'))
def deleteNode(self):
self.zk.delete('/eureka',recursive=True) # 删除Znode,recursive为递归删除
if __name__ == '__main__':
z=zk()
z.createNode()
z.getNode()
z.editNode()
z.deleteNode()
python实现节点监控
程序对/kwsy这个节点进行监视,一旦有新的节点被创建或者节点被删除,都会触发my_func这个函数,如此,就获得了最新的子节点信息,拿到这些子节点信息后,就知道服务的IP地址,端口号等信息,接下来就是以轮换或者随机的方式向这些服务发送请求了。不过需要指出的是,服务挂掉以后,zookeeper删除节点有一个延迟,这就意味着,发送请求的模块不能只以监视的结果来判断服务是不是挂掉了,而是应该自己有一个方法来判断当前选中的服务是不是可用的,毕竟,我们是在用zookeeper做服务发现,不是用来发现哪个服务挂掉的。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2020/5/29 17:11
# @Author : CaiChangEn
# @Email : mail0426@163.com
# @Site : https://blog.doorta.com
# @Software: PyCharm
from kazoo.client import KazooClient
import time
zk = KazooClient(hosts='172.16.1.1:2181,172.16.1.2:2181,172.16.1.3:2181')
zk.start()
service_lst=[]
@zk.ChildrenWatch('/eureka')
def callback(children):
global service_lst
service_lst = children
children=zk.get_children(path='/eureka')
while True:
print(service_lst)
time.sleep(10)
zk.stop()
Python实现Leader选举
可以同时运行多个worker,其中一个会获得Leader,当你杀死当前的leader后,会有一个新的leader被选出;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2020/5/29 17:11
# @Author : CaiChangEn
# @Email : mail0426@163.com
# @Site : https://blog.doorta.com
# @Software: PyCharm
from kazoo.client import KazooClient
import time
import uuid
import logging
logging.basicConfig()
my_id = uuid.uuid4()
def leader_func():
print("I am the leader {}".format(str(my_id)))
while True:
print("{} is working! ".format(str(my_id)))
time.sleep(3)
zk = KazooClient(hosts='172.16.1.1:2181,172.16.1.2:2181,172.16.1.3:2181')
zk.start()
election = zk.Election("/electionpath")
election.run(leader_func)
zk.stop()
Python实现分布式锁
锁的概念大家都熟悉,当我们希望某一件事在同一时间只有一个服务在做,或者某一个资源在同一时间只有一个服务能访问,这个时候,我们就需要用到锁;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2020/5/29 17:11
# @Author : CaiChangEn
# @Email : mail0426@163.com
# @Site : https://blog.doorta.com
# @Software: PyCharm
from kazoo.client import KazooClient
import time
import uuid
import logging
logging.basicConfig()
my_id = uuid.uuid4()
def work():
print("{} is working! ".format(str(my_id)))
zk = KazooClient(hosts='172.16.1.1:2181,172.16.1.2:2181,172.16.1.3:2181')
zk.start()
lock = zk.Lock("/lockpath", str(my_id))
print("I am {}".format(str(my_id)))
while True:
with lock:
work()
time.sleep(3)
zk.stop()