typoracopyimagesto: imgs


Zookeeper使用

1、Zookeeper简介

 Apache ZooKeeper 是 Apache 软件基金会的一个软件项目,为大型分布式系统提供开源分布式配置服务同步服务命名注册。ZooKeeper原本是Hadoop一个子项目,但现在它本身已经是一个顶级项目了。

 zookeeper经典分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力分布式协调存储服务

2、使用Docker快速部署zookeeper

2.1、Docker官方镜像

Docker Zookeeper

2.2、Docker安装zookeeper

下载zookeeper最新版镜像

docker search zookeeper 
docker pull zookeeper
docker images
docker inspect zookeeper 

docker inspect zookeeper用来查看zookeeper的详细信息

在/root/docker/目录新建一个zookeeper挂载文件夹

mkdir /root/docker/zookeeper

挂载本地文件夹并启动服务

docker run -e TZ="Asia/Shanghai" -d -p 2181:2181 -v /root/docker/zookeeper:/data --name zookeeper --restart always zookeeper

参数解释

-e TZ="Asia/Shanghai" # 指定上海时区
-d # 指示后台运行容器
-p 2181:2181 # 端口映射前面端口本地的2181端口,后者为容器内的端口
--name # 设置创建容器名称
-v # 挂在文件,将本地目录文件挂在到容器指定目录
--restart always # 始终重新启动zookeeper

2.3、进入zookeeper容器客户端

方式
docker run -it --rm --link zookeeper:zookeeper zookeeper zkCli.sh -server zookeeper       

运行上诉命令后会进入zkCli

在这里插入图片描述

方式

前台进入zookeeper容器执行脚本新建一个Client

 docker exec -it zookeeper bash   //进入zookeeper容器退出时不会关闭容器
 ./bin/zkCli.sh		//执行脚本新建一个Client    

3、docker构建zookeeper集群

3.1、创建docker-compose.yml文件

cd /root/docker/docker-compose/zookeeper # 进入你想要存放docker-compose.yml文件
vim docker-compose.yml		# 把下面的代码复制docker-compose.yml文件version: 'latest'
services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    container_name: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - /root/docker/docker-compose/zoo1/data:/data
      - /root/docker/docker-compose/zoo1/datalog:/datalog

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    container_name: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - /root/docker/docker-compose/zoo2/data:/data
      - /root/docker/docker-compose/zoo2/datalog:/datalog

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    container_name: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - /root/docker/docker-compose/zoo3/data:/data
      - /root/docker/docker-compose/zoo3/datalog:/datalog

volumes表示的是文件映射要根据自己的情况进行修改

3.2、执行构建集群命令

3.2.1、安装Docker Compose(Linux下)

我们先要搭建以下docker-compose的环境,执行以下命令下载Docker Compose,要更改版本的话替换v2.2.2就好

sudo curl -L "https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

赋予可执行权限

sudo chmod +x /usr/local/bin/docker-compose

创建软链接

sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
3.2.2、启动docker-compose.yml文件

执行命令之前要保证2181-2183端口没有占用

cd /root/docker/docker-compose/zookeeper  # 到docker-compose.yml所在的目录
docker-compose up -d
3.2.3、验证集群是否搭建成功
docker exec -it zoo1 bash
zkServer.sh status

看到如下信息表示集群已经搭建成功,可以看出zoo1的角色是follower,查看其他的会发现zoo2是follower,zoo3是leader

在这里插入图片描述

4、Zookeeper的简单使用

3.1、Go语言与Zookeeper服务端建立连接

3.1.1、建立连接代码
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)
//打印节点状态信息函数
func StatePrintf(state *zk.Stat) {
	fmt.Println("State->")
	fmt.Printf("Czxid = %v,nMzxid = %v,nCtime = %v,nMtime = %v,nVersion = %v,nCversion = %v,nAversion = %v,nEphemeralOwner = %v,nDataLength = %v,nNumChildren = %v,nPzxid = %v,n",
		state.Czxid,          //创建该节点的zxid
		state.Mzxid,          //最后一个修改节点的zxid
		state.Ctime,          //创建该节点时间
		state.Mtime,          //最后一次修改节点时间
		state.Version,        //修改节点数据次数
		state.Cversion,       //修改节点儿子节点次数
		state.Aversion,       //修改ACL的次数
		state.EphemeralOwner, //创建该临时节点会话id
		state.DataLength,     //节点数据长度
		state.NumChildren,    //该节点的儿子节点的数量
		state.Pzxid,          //最后一个修改的儿子节点的zxid(当创建或者删除子节点时才会改变)
	)
}
//输出错误信息函数
func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %vn", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	CreateNode(conn)
	SetTest(conn)
	GetTest(conn)
	DeleteTest(conn)
    //休眠5分钟关闭以便看到创建的临时节点
	time.Sleep(5 * time.Minute)
	defer conn.Close()
}
3.1.2、关于zxid的一些解释
Zookeeper中的**zxid**是一个长为64位的数字。高32位用来表示当前Leader的周期,低32位用来表示当前请求产生的事务在当前Leader周期内的顺序。每产生一个新的事务,zxid的低32位就会自动加1。当zxid达到最大值,即zxid的低32位达到`0xffffffff`,就会触发集群强制选主,Leader变更后高32位都会自增1,并重置zxid低32位的计数值(zxid高32位变为新Leader的周期,低32位变为0)。

如果一个zookeeper集群每秒操作10000次,即10k/s ops,那么
	2^32/(86400*10000)≈4.97天

也就是说4.97天之后就会进行自动切主的操作,对于一些服务来说平均五天切一次主是难以容许的,我们可以重新设计zxid,增加低位技术位数自己需要的值,假设64位全部用做低位计数

	2^64/(86400*10000)≈21350398233.46天,即58494241.73年

一般来说集群可能可靠运行这么多年,所以重新设计zxid还是要根据业务需求来进行。

3.2、创建节点

3.2.1、zkCli操作
create /节点路径 value  # 可以在创建节点的同时设置节点的值,创建的节点是持久化的节点
create -e /节点路径 value  # 创建临时节点,在客户端断开后会自动删除的节点
create -s /节点路径 value  # 创建顺序节点,zookeeper会自动在节点路径后面加顺序递增的编号

直接创建节点

在这里插入图片描述

临时节点,顺序节点,quit退出之后再进入刚创建的temp节点会消失

在这里插入图片描述

3.2.2、Go语言API操作
// 创建节点
func CreateNode(conn *zk.Conn) {
	//创建永久节点
	path, err := conn.Create("/app3", []byte("zhangsan"), 0, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create node", err)
	fmt.Printf("Created node path[%v]n", path)

	//创建临时节点,在会话结束时会自动删除临时节点
	ephemeral, err := conn.Create("/ephemeral", nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create ephemeral node", err)
	fmt.Printf("Created ephemeral node path[%v]n", ephemeral)

	//创建顺序节点
	sequence, err := conn.Create("/sequence", nil, zk.FlagSequence, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create sequence node", err)
	fmt.Printf("Created sequence node path[%v]n", sequence)

	//创建临时顺序节点 create -es /ephemeralsequece
	ephemeralsequece, err := conn.Create("/ephemeralsequece", nil, zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create ephemeralsequece node", err)
	fmt.Printf("Created ephemeralsequece node path[%v]n", ephemeralsequece)
}

3.3、修改节点

3.3.1、zkCli操作
set /节点路径 value

例如

在这里插入图片描述

3.3.2、Go语言API操作
// Set操作
func SetTest(conn *zk.Conn) {
	//获取节点的version信息
	_, state, _ := conn.Get("/app3")

	//set /app3 lisi
	state, err := conn.Set("/app3", []byte("lisi"), state.Version)
	FailOnError("Failed to Set Value", err)
	StatePrintf(state)

	//获取修改后的值
	value, _, err := conn.Get("/app3")
	FailOnError("Failed to Get New value", err)
	fmt.Println("New Value = ", value)
}

3.4、查询节点

3.4.1、zkCli操作
ls 目录   # 查看目录下的所有子节点 
get /节点路径
ls -s 目录   # 查看目录的所有详细信息

例如

在这里插入图片描述

在这里插入图片描述

3.4.2、Go语言API操作
// 查询节点
func GetTest(conn *zk.Conn) {
	result, state, err := conn.Get("/app3")
	//获取子节点
	//children,state,err:=conn.Children("/app3")
	FailOnError("Failed to Get Node Info", err)
	fmt.Printf("result:[%v]n", string(result))
	StatePrintf(state)
}

3.5、删除节点

3.5.1、zkCli操作
delete /节点路径   # 删除单个节点
deleteall /节点路径 	# 删除带有子节点的节点

例如

在这里插入图片描述

3.5.2、Go语言API操作
// 删除节点
func DeleteTest(conn *zk.Conn) {
	path := "/app3"
	//先判断节点存不存在
	exists, state, _ := conn.Exists(path)
	fmt.Printf("path[%s] exists:%vn", path, exists)
	//删除节点
	err := conn.Delete("/app3", state.Version)
	FailOnError("Failed to Delete node", err)
	fmt.Printf("path[%s] is deleted.", path)

	exists, _, _ = conn.Exists(path)
	fmt.Printf("path[%s] exists: %vn", path, exists)
}

5、go-zookeeper权限(ACL)

zookeeper的节点有五种权限:Create、Read、Write、Delete、Admin

ACL权限schema:id:permissions组成

schema有四种方式

下面对这四种方式测试一遍

4.1、world

默认方式,相当于全世界都能访问

/app3节点的权限修改crwa尝试删除其子节点 /p1

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %vn", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//获取/app3节点的acl信息
	acl, state, err := conn.GetACL("/app3")
	FailOnError("Failed to GetACL", err)
	fmt.Println("nget acl:")
	fmt.Println("scheme =", acl[0].Scheme)
	fmt.Println("id =", acl[0].ID)
	fmt.Println("permissions =", acl[0].Perms)

	//修改/app3节点的权限修改为crwa
	perms := zk.PermCreate | zk.PermRead | zk.PermWrite | zk.PermAdmin
	_, err = conn.SetACL("/app3", zk.WorldACL(int32(perms)), state.Aversion)
	FailOnError("Failed to SetACL", err)
	fmt.Println("SetAcl successful.")

	//create child node
	_, err = conn.Create("/app3/p1", nil, 0, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create node", err)

	//get state of child node
	_, state, err = conn.Get("/app3/p1")
	FailOnError("Failed to Get node info", err)

	//delete /app3/p1
	err = conn.Delete("/app3/p1", state.Version)
	FailOnError("Failed to Delete Node", err)
}

测试结果如下:因为我们没有赋予/app3节点Delete权限,即使子结点/p1赋予了全部权限也不能删除该子节点。

在这里插入图片描述

4.2、auth

auth 用来授予用户权限,所以需要先创建用户

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %vn", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//获取/auth节点的状态信息
	_, state, err := conn.Get("/auth")
	FailOnError("Failed to Get node info", err)

	//用户授权用户存在的话会新建
	err = conn.AddAuth("digest", []byte("user1:123456"))
	FailOnError("Failed to AddAuth", err)

	acl := zk.ACL{
		Scheme: "auth",
		Perms:  zk.PermAll,
		ID:     "user1:123456",
	}

	//为用户授权
	_, err = conn.SetACL("/auth", []zk.ACL{acl}, state.Version)
	FailOnError("Failed to SetACL", err)
	fmt.Println("AddAuthSuccess")
}

在这里插入图片描述

授权成功之后如果在其他连接中要查询节点信息要先验证用户信息才能进入下一步操作,也就是把conn.AddAuth操作提前,如果使用正确用户名密码,得到的会是同样的用户认证失败结果

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %vn", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//先进行访问/auth节点的话会报错
	_, _, err = conn.Get("/auth")
	if err != nil {
		fmt.Println("Get Node info error:", err)
	}

	//要先进行AddAuth操作
	err = conn.AddAuth("digest", []byte("user1:123456"))
	FailOnError("Failed to Add Auth", err)

	//再获取/auth节点的信息就不会报错
	acl, _, err := conn.GetACL("/auth")
	FailOnError("Failed to Get node info", err)
	fmt.Println("acl 信息:")
	for i := 0; i < len(acl); i++ {
		fmt.Println("scheme =", acl[0].Scheme)
		fmt.Println("id =", acl[0].ID)
		fmt.Println("permissions =", acl[0].Perms)
	}
}

测试结果来看在未进行AddAuth操作时我们是获取不到/auth节点信息的,节点的密码返回的是加密后的密码

在这里插入图片描述

4.3、digest

digestauth基本相同唯一区别在于设置权限时,密码需要使用密文。

zk golang 库中有专为digest构造方法

zk.DigestACL(perms int32, user, password string)

方法传入的密码需要是明文,其内部逻辑会将明文转为密文再向 zookeeper 传递

使用示例

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %vn", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//获取/digest
	_, state, err := conn.Get("/digest")
	FailOnError("Failed to Get node info", err)

	//用户授权用户存在的话会新建
	err = conn.AddAuth("digest", []byte("user1:123456"))
	FailOnError("Failed to AddAuth", err)

	//zk.DigestACL会将传入的明文转换成密文acl
	acl := zk.DigestACL(zk.PermAll, "user1", "123456")

	//为用户授权
	_, err = conn.SetACL("/digest", acl, state.Version)
	FailOnError("Failed to SetACL", err)
	fmt.Println("节点[/digest]已对用户 user1 授权")
}

4.4、ip

ip 权限顾名思义,就是限制 ip 地址访问权限。

把节点的权限设置给指定的 ip 地址后,其他 ip无法访问该节点。

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %vn", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()
	//获取/ip节点的状态信息
	_, state, err := conn.Get("/ip")
	FailOnError("Failed to Get node info", err)

	acl := zk.ACL{
		Scheme: "ip",
		Perms:  zk.PermAll,
		ID:     "192.168.17.1",
	}

	//为用户授权
	_, err = conn.SetACL("/ip", []zk.ACL{acl}, state.Aversion)
	FailOnError("Failed to SetACL", err)
	fmt.Println("节点[/ip]已对用户 192.168.17.1 授权")

	//获取以下节点的acl权限
	acls, _, err := conn.GetACL("/ip")
	FailOnError("Failed to Get node info", err)
	fmt.Println("acl 信息:")
	for i := 0; i < len(acls); i++ {
		fmt.Println("scheme =", acls[0].Scheme)
		fmt.Println("id =", acls[0].ID)
		fmt.Println("permissions =", acls[0].Perms)
	}
}

这里我用的是VMware的虚拟机的zookeeper然后本地Windows连接zookeeper发送消息这个过程会经过虚拟路由转发,所以这里授权的ip地址是VMware虚拟网卡地址,不然的话会报没有权限的错误

在这里插入图片描述

在这里插入图片描述

6、watch机制

5.1、watch事件类型

watch 用来实现发布/订阅功能能够多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。

每个 watch 仅有一次触发的机会,一旦触发会立即失效,想要持续监听,就需要一直注册

go-zookeeper监听的事件类型分为五种:

5.2、监听的方式

方式一、全局监听

全局监听的方式会在有监听事件发生时会执行监听器回调函数

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

func main() {
	callbackOption := zk.WithEventCallback(callback)
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5, callbackOption)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	//注册一个监听事件
	exists, state, _, err := conn.ExistsW("/global")
	if err != nil {
		log.Println(err)
	}
	//如果节点不存在则创建
	if !exists {
		//创建一个临时的global节点
		_, err = conn.Create("/global", []byte("globaltest"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
		if err != nil {
			log.Println(err)
		}
		//在注册一个监听事件监听/global节点的删除
		_, state, _, err = conn.ExistsW("/global")
		if err != nil {
			log.Println(err)
		}
	}
	err = conn.Delete("/global", state.Version)
	if err != nil {
		log.Println(err)
	}
	defer conn.Close()
}

// 监听事件的回调函数
func callback(event zk.Event) {
	fmt.Println("###########################")
	fmt.Println("path: ", event.Path)
	fmt.Println("type: ", event.Type.String())
	fmt.Println("state: ", event.State.String())
	fmt.Println("---------------------------")
}

在这里插入图片描述

测试结果

在这里插入图片描述

方式二、局部监听
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

// 输出错误信息函数
func FailOnError(msg string, err error) {
	if err != nil {
		log.Printf("%v : %vn", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	//先等待连接上再启动监听携程
	time.Sleep(5 * time.Second)
	go watchNodeCreated("/partial", conn)
	go watchNodeDataChanged("/partial", conn)
	go watchNodeChildrenChanged("/partial", conn)
	go watchNodeDeleted("/partial", conn)
	defer conn.Close()
	//等待操作结束
	time.Sleep(1 * time.Hour)
}

// 监听节点的创建事件
func watchNodeCreated(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Createn", path)
	for {
		//利用channel通信机制将Event数据传递给ch
		//ch:=make(chan Event)
		_, _, ch, err := conn.ExistsW(path)
		//当err为nil时ch才会有数据否者会阻塞协程
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeCreated {
				log.Printf("Node[%v] Createdn", path)
			}
		} else {
			FailOnError("Failed to watchNodeCreated", err)
		}
	}
}

// 监听节点数据修改事件
func watchNodeDataChanged(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Data Changen", path)
	for {
		_, _, ch, err := conn.GetW(path)
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeDataChanged {
				log.Printf("Node[%v] Data Changed", path)
			}
		}
	}
}

// 监听节点子节点的修改事件
func watchNodeChildrenChanged(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Children Change", path)
	for {
		_, _, ch, err := conn.ChildrenW(path)
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeChildrenChanged {
				log.Printf("Node[%v] Children Changed", path)
			}
		}
	}
}

// 监听节点的删除事件
func watchNodeDeleted(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Delete", path)
	for {
		_, _, ch, err := conn.ExistsW(path)
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeDeleted {
				log.Printf("Node[%v] Deleted", path)
			}
		} else {
			FailOnError("Failed to watchNodeDeleted", err)
		}
	}
}

启动程序之后在虚拟机运行客户程序执行以下命令

在这里插入图片描述

程序会输出以下结果

在这里插入图片描述

7、go-zookeeper实现分布式

zookeeper的分布式可以利用每个节点的唯一性来完成,但所有服务监听一个节点对于分布式系统来说完全是资源浪费。而zookeeper可以利用临时顺序节点来创建一个有序的临时节点列表来完成分布式锁:
  1. 客户端获取锁时,在lock节点下创建临时顺序节点。
  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁之后,将该节点删除。
  3. 如果发现自己创建的子节点并非所有子节点中最小的,说明自己没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
  4. 如果发现比自己小的那个节点删除,则客户端的Watcher会收到相应的同支,此时再次判断自己创建的节点是否lock子节点中序最小的,如果是则获取到锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

假如服务 A 创建了节点 a,此时节点 a 的前面没有节点,所以服务 A 可以执行。此时服务 B 创建了节点 b,节点 b 是节点 a 的下一个节点,那么服务 B 只需要监听节点 a 即可

也就是说,因为临时有序节点列表是有序的,所以每个服务只需要监听自己创建的节点的前一个节点即可

我们利用golanggoroutine模拟客户端实现分布式锁的过程,以下是50个goroutine进行抢锁的示例
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	var wg sync.WaitGroup

	for i := 0; i < 50; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			//新建一个锁
			lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll))
			//加锁
			err = lock.LockWithData([]byte("it is a lock"))
			if err != nil {
				panic(err)
			}
			fmt.Println("第", n, "个 goroutine 获取到了锁")
			time.Sleep(time.Second) // 1 秒后释放锁
			//解锁
			lock.Unlock()
		}(i)
	}
	//等待协程运行结束
	wg.Wait()
}

运行结果如下(只截取了一部分)

在这里插入图片描述

原文地址:https://blog.csdn.net/m0_52530105/article/details/131318761

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_35288.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注