Run
StartControllers
ReplicaSet
ReplicaSetController
syncReplicaSet
Summary
Run确立目标
理解 kube-controller-manager 的运行机制
从主函数找到run函数,代码较长,这里精简了一下
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// configz 模块,在kube-scheduler分析中已经了解
if cfgz, err := configz.New(ConfigzName); err == nil {
cfgz.Set(c.ComponentConfig)
} else {
klog.Errorf("unable to register configz: %v", err)
}
// 健康监测与http服务,跳过
var checks []healthz.HealthChecker
var unsecuredMux *mux.PathRecorderMux
run := func(ctx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
// client认证相关
var clientBuilder controller.ControllerClientBuilder
// 创建controller的上下文context
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
// 这里的 InformerFactory 和我们在kube-scheduler中看的 SharedInformerFactory 基本一致
controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)
select {}
}
// 是否进行选举
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
panic("unreachable")
}
// 拼接出一个全局唯一的id
id, err := os.Hostname()
if err != nil {
return err
}
id = id + "_" + string(uuid.NewUUID())
rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
c.LeaderElectionClient.CoreV1(),
c.LeaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
})
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
// 正常情况下都是阻塞在RunOrDie这个函数中,不停地进行选举相关的工作
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
// 开始成为Leader的时候,调用run函数
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
WatchDog: electionChecker,
Name: "kube-controller-manager",
})
panic("unreachable")
}
StartControllers
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
// 关键性的循环,启动每个controllers,key为控制器名字,value为初始化函数
for controllerName, initFn := range controllers {
// 是否允许启动
if !ctx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
klog.V(1).Infof("Starting %q", controllerName)
// 调用init函数进行启动
debugHandler, started, err := initFn(ctx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
// 注册对应controller到debug的url中
if debugHandler != nil && unsecuredMux != nil {
basePath := "/debug/controllers/" + controllerName
unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
}
klog.Infof("Started %q", controllerName)
}
return nil
}
// 我们再去传入controller的函数去看看,对应的controller有哪些,这里有我们很多常见的概念,不一一细讲
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
controllers["nodelifecycle"] = startNodeLifecycleController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController
return controllers
}
ReplicaSet
由于我们的示例是创建一个nginx的pod,涉及到kube-controller-manager的内容很少。
但是,为了加深大家对 kube-controller-manager 的认识,我们引入一个新的概念 - ReplicaSet,下面是官方说明:
A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.
ReplicaSet 的目的是维护一组在任何时候都处于运行状态的 Pod 副本的稳定集合。 因此,它通常用来保证给定数量的、完全相同的 Pod 的可用性。
简单来说,ReplicaSet 就是用来生成指定个数的Pod
代码在pkg/controller/replica_set.go
ReplicaSetControllerfunc startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}
// 用goroutine异步运行,包含了 ReplicaSet和Pod 的两个Informer
// 这一点很好理解:我们是要控制ReplicaSet声明的数量和运行的Pod数量一致,需要同时观察者两种资源
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}
// 运行函数
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)
if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
for i := 0; i < workers; i++ {
// 工作的函数
go wait.Until(rsc.worker, time.Second, stopCh)
}
<-stopCh
}
func (rsc *ReplicaSetController) worker() {
// 继续查找实现
for rsc.processNextWorkItem() {
}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
// 这里也有个queue的概念,可以类比kube-scheduler中的实现
// 不同的是,这里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具体实现今天不细看
// 获取元素
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
// 处理对应的元素
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
// 再回过头,去查看syncHandler的具体实现
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
rsc.syncHandler = rsc.syncReplicaSet
return rsc
}
syncReplicaSet
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()
// 从key中拆分出 namespace 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 根据name,从 Lister 获取对应的 ReplicaSets 信息
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
// 获取 selector (k8s 是根据selector中的label来匹配 ReplicaSets 和 Pod 的)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
return nil
}
// 根据namespace和labels获取所有的pod
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
// 过滤无效的pod
filteredPods := controller.FilterActivePods(allPods)
// 根据selector再过滤pod
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
if err != nil {
return err
}
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
// 管理 ReplicaSet,下面详细分析
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// 更新状态
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
return err
}
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
// 我们再一起看看,当Pod数量和ReplicaSet中声明的不同时,是怎么工作的
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
// diff = 当前pod数 - 期望pod数
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
// diff小于0,表示需要扩容,即新增Pod
if diff < 0 {
// 具体的实现暂时不细看
// diff 大于0,即需要缩容
} else if diff > 0 {
}
return nil
}
站在前人的肩膀上,向前辈致敬,Respect!
Summarykube-controller-manager 的核心思想是: 根据期望状态和当前状态,管理Kubernetes中的资源。 以ReplicaSet为例,它对比了定义声明的Pod数和当前集群中满足条件的Pod数,进行相对应的扩缩容。
以上就是Kubernetes controller manager运行机制源码解析的详细内容,更多关于Kubernetes controller manager的资料请关注软件开发网其它相关文章!