client-go gin的简单整合四-list-watch初探

  • A+
所属分类:kubernetes k8s client-go

背景:

完成了client-go gin的简单整合三(list列表相关再进阶关于Pods),恩如果有代理是可以看到每次的请求都要访问后端服务的,如何避免频繁调用后端apiserver呢?list-watch监听机制可以使用一下?

关于list-watch:

参照:https://blog.51cto.com/u_15127559/3377812(错别字好多?最后还是引用了沈老师的ppt上面的概念!),

  • list http短链接调用资源的api,获取列表。
  • 使用http长连接持续监听资源,有变化则返回一个WatchEvent

client-go informer

client-go k8s.io/client-go/tools/cache包informer对象对list-watch机制进行了封装

  • 初始化调用List api获得全量list 缓存(本地缓存)
  • 调用watch api watch资源,当资源发生变更通过一定机制维护缓存,减少访问apiserver的压力

个人觉得不错的文章Client-go源码分析之Reflector,华为云不错的视频list-watch机制原理详解,
client-go(kubernetes)的ListerWatcher解析.

client-go gin的简单整合四-list-watch

以deployment简单例子的开始

文件名** /src/service/test.go,监控deployment的变化, **开始其实是不是可以跟java是的弄一个单独的测试包?这里就简单操作了偷懒…

cache.NewInformer()开始:

cache.NewInformer()

![image.png](https://img-blog.csdnimg.cn/img_convert/2009eb16fbeaba0c892bbe93c1a768b6.png#clientId=ub13ab056-ab14-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1080&id=u41974578&margin=[object Object]&name=image.png&originHeight=1080&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=223053&status=done&style=none&taskId=u844e08a3-c837-4912-8375-6e3eaae4273&title=&width=1920)
![image.png](https://img-blog.csdnimg.cn/img_convert/5460fa6cc0d07589e7ee5459c4a0d412.png#clientId=ub13ab056-ab14-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1080&id=uc7dc9206&margin=[object Object]&name=image.png&originHeight=1080&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=281241&status=done&style=none&taskId=u9435994c-f0f9-45d1-8b60-3088872a3f1&title=&width=1920)

goland查看源码功能:

![image.png](https://img-blog.csdnimg.cn/img_convert/39b3e755d73a925943de6c2d4814d811.png#clientId=ub13ab056-ab14-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=272&id=u5fb7a7cb&margin=[object Object]&name=image.png&originHeight=272&originWidth=778&originalType=binary&ratio=1&rotation=0&showTitle=false&size=36434&status=done&style=none&taskId=u41fc8615-2331-4dbf-8bd1-6355abac4ef&title=&width=778)
![image.png](https://img-blog.csdnimg.cn/img_convert/90a72c491bfa758ea6f2d92a121910f5.png#clientId=ub13ab056-ab14-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1080&id=u0b27478a&margin=[object Object]&name=image.png&originHeight=1080&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=198003&status=done&style=none&taskId=u5e8f84bf-53c8-44fa-84f4-f8be35c8a59&title=&width=1920)
![image.png](https://img-blog.csdnimg.cn/img_convert/553a940b93ae52c87f17a774724e6736.png#clientId=ub13ab056-ab14-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1080&id=u6220b746&margin=[object Object]&name=image.png&originHeight=1080&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=309811&status=done&style=none&taskId=u21049537-8c88-471f-b1e4-e5f9c1b2240&title=&width=1920)

实现Handler方法:

只实现了OnUpdate方法(仅仅打印deployment名字),OnAdd,OnDelete是空的:

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}

最终test.go如下:

package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/tools/cache"
)

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}
func main() {
	s, c := cache.NewInformer(cache.NewListWatchFromClient(lib.K8sClient.AppsV1().RESTClient(),
		"deployments", "default", fields.Everything()),
		&v1.Deployment{},
		0,
		&DepHandler{},
	)
	c.Run(wait.NeverStop)
	s.List()
}

关于s c 源码中Store, Controller
![image.png](https://img-blog.csdnimg.cn/img_convert/77b4be694ba65aa8666c8cfef3210ba2.png#clientId=u3c2429ff-0001-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=393&id=ud541af1b&margin=[object Object]&name=image.png&originHeight=393&originWidth=753&originalType=binary&ratio=1&rotation=0&showTitle=false&size=65111&status=done&style=none&taskId=uaf0aca9d-0fc7-4766-8755-bc0b7dc420d&title=&width=753)

运行test.go

go run test.go
手动修改nginx deployment副本数量,查看 goland输出:

[zhangpeng@zhangpeng ~]$ kubectl get deployments
NAME    READY   UP-TO-DATE   AVAILABLE   AGE
nginx   2/2     2            2           10d
[zhangpeng@zhangpeng ~]$ kubectl get deployment
NAME    READY   UP-TO-DATE   AVAILABLE   AGE
nginx   2/2     2            2           10d
[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=3
deployment.apps/nginx scaled
[zhangpeng@zhangpeng ~]$ 

![image.png](https://img-blog.csdnimg.cn/img_convert/8058e30c22b9d40587f3b07367bf0843.png#clientId=u3c2429ff-0001-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=876&id=ub6fa2df5&margin=[object Object]&name=image.png&originHeight=876&originWidth=1109&originalType=binary&ratio=1&rotation=0&showTitle=false&size=155089&status=done&style=none&taskId=u12a31552-ea6a-4ce8-8abb-17818487f35&title=&width=1109)
注:resource 是deployments,不能是deployment, *DepHandler 为什么加指针运算符?

实现一个pod的list-watch?

首先停止test.go,并注释掉test.go中代码。
![image.png](https://img-blog.csdnimg.cn/img_convert/c13608b6f02df152ccaa0888c23a6bfc.png#clientId=u3c2429ff-0001-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=596&id=u34053245&margin=[object Object]&name=image.png&originHeight=596&originWidth=1014&originalType=binary&ratio=1&rotation=0&showTitle=false&size=115314&status=done&style=none&taskId=u5d225787-a5ca-496d-8e9d-e2239220bbd&title=&width=1014)
创建了一个/src/service/test1.go,照着test.go来一遍:

package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/tools/cache"
)

type PodHandler struct {
}

func (p *PodHandler) OnAdd(obj interface{}) {}
func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
	if pods, ok := newObj.(*corev1.Pod); ok {
		fmt.Println(pods.Name)
	}
}
func (p *PodHandler) OnDelete(obj interface{}) {
}
func main() {
	s, c := cache.NewInformer(cache.NewListWatchFromClient(lib.K8sClient.CoreV1().RESTClient(),
		"pods", "default", fields.Everything()),
		&corev1.Pod{},
		0,
		&PodHandler{},
	)
	c.Run(wait.NeverStop)
	s.List()
}

注意:pod 的api是 corev1。参照:https://github.com/kubernetes/client-go/blob/release-1.23/informers/core/v1/pod.go#L58
运行test1.go,修改default 下nginx deployment副本数:

[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=3
deployment.apps/nginx scaled

![image.png](https://img-blog.csdnimg.cn/img_convert/e172534559215b2e91e397a13b81badd.png#clientId=u3c2429ff-0001-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1080&id=u9527e07e&margin=[object Object]&name=image.png&originHeight=1080&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=400199&status=done&style=none&taskId=u721488c0-daa5-4b88-b827-6c872b14adb&title=&width=1920)

SharedInformerFactory工厂模式

思考一下为什么要使用工厂模式呢?
关于SharedInformerFactory参考:https://stackoverflow.com/questions/40975307/how-to-watch-events-on-a-kubernetes-service-using-its-go-client,https://qiankunli.github.io/2020/07/20/client_go.html

先test1.go的pod开始:

![image.png](https://img-blog.csdnimg.cn/img_convert/2af162751495d553e151c9d516de3ce9.png#clientId=u3c2429ff-0001-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1080&id=u791c0cc6&margin=[object Object]&name=image.png&originHeight=1080&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=254171&status=done&style=none&taskId=u721db102-9e3b-4326-bdd7-5fee3a03b58&title=&width=1920)
![image.png](https://img-blog.csdnimg.cn/img_convert/80fd637fad055ecc91a18709563951fe.png#clientId=u3c2429ff-0001-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=163&id=u402960f7&margin=[object Object]&name=image.png&originHeight=163&originWidth=954&originalType=binary&ratio=1&rotation=0&showTitle=false&size=31481&status=done&style=none&taskId=u69ecaede-142b-4f80-a004-2e91a19f022&title=&width=954)
src/service/test1.go

package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
)

type PodHandler struct {
}

func (p *PodHandler) OnAdd(obj interface{}) {}
func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
	if pods, ok := newObj.(*corev1.Pod); ok {
		fmt.Println(pods.Name)
	}
}
func (p *PodHandler) OnDelete(obj interface{}) {
}
func main() {
	factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
	podinformer := factory.Core().V1().Pods()
	podinformer.Informer().AddEventHandler(&PodHandler{})
	factory.Start(wait.NeverStop)
	select {}
}

写的时候以为直接corev1…发现是core().v1(),为什么要用select{}呢?参照:https://blog.csdn.net/cbmljs/article/details/93497415。阻塞,防止程序退出!运行test1.go程序,修改nginx deployment副本:

[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=4
deployment.apps/nginx scaled

![image.png](https://img-blog.csdnimg.cn/img_convert/0058f71fa03a1bf25405236dfa157ffb.png#clientId=u3c2429ff-0001-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1080&id=u8e777982&margin=[object Object]&name=image.png&originHeight=1080&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=473163&status=done&style=none&taskId=u55817d0e-3877-4259-bcff-7f7bc2b2a2a&title=&width=1920)
可不可以deployment pod list-watch搞在一起呢?现在写了一个test.go,test1.go?貌似也是可以的…先搞在一起,如下:

package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
)

type PodHandler struct {
}

func (p *PodHandler) OnAdd(obj interface{}) {}
func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
	if pods, ok := newObj.(*corev1.Pod); ok {
		fmt.Println(pods.Name)
	}
}
func (p *PodHandler) OnDelete(obj interface{}) {
}

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}
func main() {
	factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
	podinformer := factory.Core().V1().Pods()
	podinformer.Informer().AddEventHandler(&PodHandler{})
	depinformer := factory.Apps().V1().Deployments()
	depinformer.Informer().AddEventHandler(&DepHandler{})
	factory.Start(wait.NeverStop)
	select {}
}

运行test1.go ,更改nginx deployment副本数量,打印了deployment name 和pod name!
![image.png](https://img-blog.csdnimg.cn/img_convert/42f59d29f9cb48d8e734f52c02eb8ace.png#clientId=u3c2429ff-0001-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1080&id=udc66b302&margin=[object Object]&name=image.png&originHeight=1080&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=442125&status=done&style=none&taskId=u07669972-2744-4914-a738-ae69613c53e&title=&width=1920)

Handler OnAdd

补全一下OnAdd方法,打印一下pod deployment列表:

func (p *PodHandler) OnAdd(obj interface{}) {
	fmt.Println(obj.(*corev1.Pod).Name)
}
func (d *DepHandler) OnAdd(obj interface{}) {
	fmt.Println(obj.(*v1.Deployment).Name)
}
package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
)

type PodHandler struct {
}

func (p *PodHandler) OnAdd(obj interface{}) {
	fmt.Println(obj.(*corev1.Pod).Name)
}
func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
	if pods, ok := newObj.(*corev1.Pod); ok {
		fmt.Println(pods.Name)
	}
}
func (p *PodHandler) OnDelete(obj interface{}) {
}

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {
	fmt.Println(obj.(*v1.Deployment).Name)
}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}
func main() {
	factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
	podinformer := factory.Core().V1().Pods()
	podinformer.Informer().AddEventHandler(&PodHandler{})
	depinformer := factory.Apps().V1().Deployments()
	depinformer.Informer().AddEventHandler(&DepHandler{})
	factory.Start(wait.NeverStop)
	select {}
}

![image.png](https://img-blog.csdnimg.cn/img_convert/0f4996cb76c1ff4a3e34555894937911.png#clientId=u3c2429ff-0001-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1080&id=zoytw&margin=[object Object]&name=image.png&originHeight=1080&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=247315&status=done&style=none&taskId=u98df8330-5c0d-438f-af9d-6f1b34051f9&title=&width=1920)

sync.Map

为什么引用sync.Map呢?Go语言sync.Map(在并发环境中使用的map),还是考虑并发原因!
拿deployment为例,打印一下develop namespace命名空间下的deployment列表:
test1.go

package main

import (
	"context"
	"fmt"
	"github.com/gin-gonic/gin"
	"k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
	"log"
	"sync"
	"time"
)

type DeploymentMap struct {
	data sync.Map
}

func (depmap *DeploymentMap) Add(dep *v1.Deployment) {
	if list, ok := depmap.data.Load(dep.Namespace); ok {
		list = append(list.([]*v1.Deployment), dep)
		depmap.data.Store(dep.Namespace, list)
	} else {
		depmap.data.Store(dep.Namespace, []*v1.Deployment{dep})
	}
}

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {
	//fmt.Println(obj.(*v1.Deployment).Name)
	DepMap.Add(obj.(*v1.Deployment))
}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}

var DepMap *DeploymentMap

func init() {
	DepMap = &DeploymentMap{}
}
func main() {
	factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
	depinformer := factory.Apps().V1().Deployments()
	depinformer.Informer().AddEventHandler(&DepHandler{})
	factory.Start(wait.NeverStop)
	c, _ := context.WithTimeout(context.Background(), time.Second*3)
	select {
	case <-c.Done():
		log.Fatal("time out")
	default:
		r := gin.New()
		r.GET("/", func(c *gin.Context) {
			var res []string
			DepMap.data.Range(func(key, value interface{}) bool {
				if key == "develop" {
					for _, item := range value.([]*v1.Deployment) {
						res = append(res, item.Name)
					}
				}
				return true

			})
			c.JSON(200, res)

		})
		r.Run(":8080")
	}
}

没有作具体的路由,直接测试一下访问:http://127.0.0.1:8080/
![image.png](https://img-blog.csdnimg.cn/img_convert/d0b5e614e3c33f45b8fdfd32e0538d82.png#clientId=u62c474ba-b864-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=527&id=uf0fbca8e&margin=[object Object]&name=image.png&originHeight=527&originWidth=765&originalType=binary&ratio=1&rotation=0&showTitle=false&size=41564&status=done&style=none&taskId=uf059bf9d-e7bf-427e-848b-0cda8ca2032&title=&width=765)

总结

  1. list-watch机制
  2. cache informer,informer工厂模式。
  3. handler实现
  4. sync.map
  5. 断言…
w3cjava