Istio-配置规则监听分发原理

2022-08-01 09:06:50

我们对于istio的操作基本上都是通过修改CRD资源来进行实现,无论是istio服务的配置(istiooperators.install.istio.io) 还是流量规则(virtualservices、serviceentries…)
接下来让我们走进源码去看看istio对资源是如何进行监听、转换并推送给envoy的.
下面是创建virtualservices资源后,istio对其进行响应流程图.

istio中的规则资源对应envoy中的配置是一对多的关系,也就是说istio中的一个规则可能对多个envoy配置有影响,比如Virtualservices规则就对Cluster、route这两个配置有影响

创建informer资源

既然istio采用informer的形式对资源进行监听,那么我们就看看它的创建方式
该方法由NewServer()->initControllers()->initConfigController调用

func(s*Server)initK8SConfigStore(args*PilotArgs)error{if s.kubeClient==nil{returnnil}//为istio涉及的CRD资源创建informer并为其添加监听事件,//事件触发的方法存放在configController.handler中// 增删改事件 都调用同一个方法
   configController, err:= s.makeKubeConfigController(args)if err!=nil{return err}// 这里创建WorkloadEntry资源控制器,WorkloadEntry资源这里不过多介绍// 该控制器完成对WorkloadEntry节点的健康检查以及优雅关闭功能
   s.XDSServer.WorkloadEntryController= workloadentry.NewController(configController, args.PodName, args.KeepaliveOptions.MaxServerConnectionAge)returnnil}

让我们进入makeKubeConfigController

func(s*Server)makeKubeConfigController(args*PilotArgs)(model.ConfigStoreController,error){// 通过这个名字我们应该就有了大概的猜测return crdclient.New(s.kubeClient, args.Revision, args.RegistryOptions.KubeOptions.DomainSuffix)}funcNew(client kube.Client, revision, domainSuffixstring)(model.ConfigStoreController,error){// 获取CRD资源的schemas,它实现了crd资源从gvk到gvr,gvr到gvk转换的功能
	schemas:= collections.Pilotif features.EnableGatewayAPI{
		schemas= collections.PilotGatewayAPI}returnNewForSchemas(client, revision, domainSuffix, schemas)}// 让我们看一下collections.PilotGatewayAPI里有什么
	PilotGatewayAPI= collection.NewSchemasBuilder().MustAdd(IstioExtensionsV1Alpha1Wasmplugins).MustAdd(IstioNetworkingV1Alpha3Destinationrules).MustAdd(IstioNetworkingV1Alpha3Envoyfilters).MustAdd(IstioNetworkingV1Alpha3Gateways).MustAdd(IstioNetworkingV1Alpha3Serviceentries).MustAdd(IstioNetworkingV1Alpha3Sidecars).MustAdd(IstioNetworkingV1Alpha3Virtualservices).MustAdd(IstioNetworkingV1Alpha3Workloadentries).MustAdd(IstioNetworkingV1Alpha3Workloadgroups).MustAdd(IstioNetworkingV1Beta1Proxyconfigs).MustAdd(IstioSecurityV1Beta1Authorizationpolicies).MustAdd(IstioSecurityV1Beta1Peerauthentications).MustAdd(IstioSecurityV1Beta1Requestauthentications).MustAdd(IstioTelemetryV1Alpha1Telemetries).MustAdd(K8SGatewayApiV1Alpha2Gatewayclasses).MustAdd(K8SGatewayApiV1Alpha2Gateways).MustAdd(K8SGatewayApiV1Alpha2Httproutes).MustAdd(K8SGatewayApiV1Alpha2Referencepolicies).MustAdd(K8SGatewayApiV1Alpha2Tcproutes).MustAdd(K8SGatewayApiV1Alpha2Tlsroutes).Build()

进入 NewForSchemas

funcNewForSchemas(client kube.Client, revision, domainSuffixstring, schemas collection.Schemas)(model.ConfigStoreController,error){
   schemasByCRDName:=map[string]collection.Schema{}for_, s:=range schemas.All(){// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
      name:= fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group())
      schemasByCRDName[name]= s}
   out:=&Client{
      domainSuffix:     domainSuffix,
      schemas:          schemas,
      schemasByCRDName: schemasByCRDName,
      revision:         revision,
      queue:            queue.NewQueue(1* time.Second),
      kinds:map[config.GroupVersionKind]*cacheHandler{},// 定义了informer事件处理方法,所有资源中的增删改事件都会遍历该方法调用
      handlers:map[config.GroupVersionKind][]model.EventHandler{},
      client:           client,
      istioClient:      client.Istio(),
      gatewayAPIClient: client.GatewayAPI(),
      crdMetadataInformer: client.MetadataInformer().ForResource(collections.K8SApiextensionsK8SIoV1Customresourcedefinitions.Resource().GroupVersionResource()).Informer(),
      beginSync:   atomic.NewBool(false),
      initialSync: atomic.NewBool(false),}// 会获取当前集群中的所有CRD,用来判断当前资源是否在当前的集群中注册
   known, err:=knownCRDs(client.Ext())for_, s:=range schemas.All(){// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
      name:= fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group())
      crd:=trueif_, f:= collections.Builtin.Find(s.Name().String()); f{
         crd=false}// 只要判断是CRD资源那么就为其创建informerif!crd{handleCRDAdd(out, name,nil)}else{if_, f:= known[name]; f{handleCRDAdd(out, name,nil)}else{
            scope.Warnf("Skipping CRD %v as it is not present", s.Resource().GroupVersionKind())}}}return out,nil}

接下来就到了我们本文的重点了informer的创建

funchandleCRDAdd(cl*Client, namestring, stop<-chanstruct{}){// 获取当前资源的schemas
   s, f:= cl.schemasByCRDName[name]
   resourceGVK:= s.Resource().GroupVersionKind()
   gvr:= s.Resource().GroupVersionResource()

   cl.kindsMu.Lock()var i informers.GenericInformervar ifactory startervar errerror// 这里对当前资源进行类型判断switch s.Resource().Group(){// 如果是gateway资源case gvk.KubernetesGateway.Group:
      ifactory= cl.client.GatewayAPIInformer()
      i, err= cl.client.GatewayAPIInformer().ForResource(gvr)// 如果是pod 等资源case gvk.Pod.Group, gvk.Deployment.Group, gvk.MutatingWebhookConfiguration.Group:
      ifactory= cl.client.KubeInformer()
      i, err= cl.client.KubeInformer().ForResource(gvr)case gvk.CustomResourceDefinition.Group:
      ifactory= cl.client.ExtInformer()
      i, err= cl.client.ExtInformer().ForResource(gvr)default:// 如果不是上面以上资源那么就是isito资源
      ifactory= cl.client.IstioInformer()
      i, err= cl.client.IstioInformer().ForResource(gvr)}// 光创建了informer还不够,我们还需要添加监听事件,这里我就不展开了// 原理很简单使用informer的AddEventHandler方法,添加监听事件方法// 增删改 都实现同一个方法,该方法里面循环调用了上面的handler方法
   cl.kinds[resourceGVK]=createCacheHandler(cl, s, i)if w, f:= crdWatches[resourceGVK]; f{
      scope.Infof("notifying watchers %v was created", resourceGVK)
      w.once.Do(func(){close(w.stop)})}if stop!=nil{// Start informer factory, only if stop is defined. In startup case, we will not start here as// we will start all factories once we are ready to initialize.// For dynamically added CRDs, we need to start immediately though
      ifactory.Start(stop)}}

添加handler方法

既然最终调用的是handler方法,那我们就看看handler中都有什么
有些资源的handler可能与大部队不太一样,因为实现原理与功能不一样
Serviceentries、Workloadentries、Workloadgroups这三个资源主要目的是提供服务注册的功能,Serviceentries提供了外部服务的注册、Workloadentries提供了内部服务自动注册的功能。
其余资源都使用下面方法

configHandler:=func(prev config.Config, curr config.Config, event model.Event){// 判断事件类型 更新资源状态deferfunc(){if event!= model.EventDelete{
         s.statusReporter.AddInProgressResource(curr)}else{
         s.statusReporter.DeleteInProgressResource(curr)}}()// 判断是否要发送更新请求if event== model.EventUpdate&&!needsPush(prev, curr){
      log.Debugf("skipping push for %s as spec has not changed", prev.Key())return}// 封装更新请求
   pushReq:=&model.PushRequest{
      Full:true,
      ConfigsUpdated:map[model.ConfigKey]struct{}{{// 这里并没有将修改后的配置信息全部放进来// 会在配置生成时根据名称与命名空间进行查询
         Kind:      curr.GroupVersionKind,
         Name:      curr.Name,
         Namespace: curr.Namespace,}:{}},// 版本,标识当前请求包的用途,比如配置更新.
      Reason:[]model.TriggerReason{model.ConfigUpdate},}
   s.XDSServer.ConfigUpdate(pushReq)}

分发

XDSServer.ConfigUpdate主要目的是将当前资源转化为envoy识别的配置然后发送给每个envoy

让我们继续往下追踪

`func_(_s*DiscoveryServer_)_ConfigUpdate_(_req*model.PushRequest_){// 用于审计使用
   inboundConfigUpdates.Increment()// 当前服务收到的配置更新的数量
   s.InboundUpdates.Inc_()// 将上面的pushReq结构体发送给pushChannel
   s.pushChannel<- req}

管道?那么谁在监听管道那,我们可以猜想一下,这里肯定是由一个服务器进行处理,显而易见就是DescoveryServer服务器,那么在它的那个阶段监听那?必然是start阶段.

func(s*DiscoveryServer)Start(stopCh<-chanstruct{}){// 启动_WorkloadEntryController用于对注册的边车应用进行健康检查_go s.WorkloadEntryController.Run(topCh)//  这里做了一步优化,它会将时间段内大量的req进行合并成一个包发送出去go s.handleUpdates_(_stopCh_)->debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates)// 审计功能go s.periodicRefreshMetrics(stopCh)// 这里的功能就是,对配置信息进行生成通过stream分发出去go s.sendPushes(stopCh)->doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)}

让我们一一分析
s.handleUpdates

// 接下来istio进行了抖动分析等优化措施来避免全量分发funcdebounce(chchan*model.PushRequest, stopCh<-chanstruct{}, opts debounceOptions, pushFnfunc(req*model.PushRequest), updateSent*atomic.Int64){...
    
	push:=func(req*model.PushRequest, debouncedEventsint){// 这里会调用初始化req操作pushFn(req)
		updateSent.Add(int64(debouncedEvents))
		freeCh<-struct{}{}}....for{select{case<-freeCh:
			free=truepushWorker()case r:=<-ch:// If reason is not set, record it as an unknown reasoniflen(r.Reason)==0{
				r.Reason=[]model.TriggerReason{model.UnknownTrigger}}if!opts.enableEDSDebounce&&!r.Full{// trigger push now, just for EDSgofunc(req*model.PushRequest){pushFn(req)
					updateSent.Inc()}(r)continue}

			lastConfigUpdateTime= time.Now()if debouncedEvents==0{
				timeChan= time.After(opts.debounceAfter)
				startDebounce= lastConfigUpdateTime}
			debouncedEvents++

			req= req.Merge(r)case<-timeChan:if free{pushWorker()}case<-stopCh:return}}}func(s*DiscoveryServer)Push(req*model.PushRequest){// 如果为false则进行增量更新,而不是完全推送if!req.Full{
		req.Push= s.globalPushContext()
		s.dropCacheForRequest(req)
		s.AdsPushAll(versionInfo(), req)return}// 这里会初始化上下文信息
	oldPushContext:= s.globalPushContext()if oldPushContext!=nil{
		oldPushContext.OnConfigChange()// Push the previous push Envoy metrics.
		envoyfilter.RecordMetrics()}// PushContext is reset after a config change. Previous status is// saved.
	t0:= time.Now()

	versionLocal:= time.Now().Format(time.RFC3339)+"/"+ strconv.FormatUint(versionNum.Inc(),10)
	push, err:= s.initPushContext(req, oldPushContext, versionLocal)if err!=nil{return}
	initContextTime:= time.Since(t0)
	log.Debugf("InitContext %v for push took %s", versionLocal, initContextTime)
	pushContextInitTime.Record(initContextTime.Seconds())

	versionMutex.Lock()
	version= versionLocal
	versionMutex.Unlock()

	req.Push= push
	s.AdsPushAll(versionLocal, req)}// to the model ConfigStorageCache and Controller.func(s*DiscoveryServer)AdsPushAll(versionstring, req*model.PushRequest){...
	s.startPush(req)}// Send a signal to all connections, with a push event.func(s*DiscoveryServer)startPush(req*model.PushRequest){
    
	req.Start= time.Now()// 这里会获取所有的client,该值是在envoy节点注册到istio时创建的// 下一章将会讲解istio服务注册for_, p:=range s.AllClients(){// 这里会将conn 与 req包放入队列
		s.pushQueue.Enqueue(p, req)}}

s.sendPushes
这里我们可以看到最终又又又放入了一个队列当中,那么谁去从这个队列当中取数据那? s.sendPushes登场了

funcdoSendPushes(stopCh<-chanstruct{}, semaphorechanstruct{}, queue*PushQueue){for{select{case<-stopCh:returndefault:// We can send to it until it is full, then it will block until a pushes finishes and reads from it.// This limits the number of pushes that can happen concurrently
			semaphore<-struct{}{}// 从队列当中获取值
			client, push, shuttingdown:= queue.Dequeue()// 发送通知信息用于审计recordPushTriggers(push.Reason...)// Signals that a push is done by reading from the semaphore, allowing another send on it.
			doneFunc:=func(){
				queue.MarkDone(client)<-semaphore}

			proxiesQueueTime.Record(time.Since(push.Start).Seconds())var closed<-chanstruct{}//做一些deferif client.stream!=nil{
				closed= client.stream.Context().Done()}
  • 作者:迷茫路人
  • 原文链接:https://blog.csdn.net/a1023934860/article/details/125787691
    更新时间:2022-08-01 09:06:50